[spark] branch master updated (5825db81e00 -> 7ad1c80f281)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5825db81e00 [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF add 7ad1c80f281 [SPARK-42880][DOCS] Update running-on-yarn.md to log4j2 syntax No new revisions were added by this update. Summary of changes: docs/running-on-yarn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5825db81e00 [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF 5825db81e00 is described below commit 5825db81e0059a4895b4f59d57dec67b0bc618b4 Author: panbingkun AuthorDate: Wed Mar 22 12:14:58 2023 +0800 [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF ### What changes were proposed in this pull request? - As a subtask of [SPARK-42050](https://issues.apache.org/jira/browse/SPARK-42050), this PR adds Codegen Support for HiveSimpleUDF - Extract a`HiveUDFEvaluatorBase` class for the common behaviors of HiveSimpleUDFEvaluator & HiveGenericUDFEvaluator. ### Why are the changes needed? - Improve codegen coverage and performance. - Following https://github.com/apache/spark/pull/39949. Make the code more concise. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new UT. Pass GA. Closes #40397 from panbingkun/refactor_HiveSimpleUDF. Authored-by: panbingkun Signed-off-by: Wenchen Fan --- .../apache/spark/sql/hive/hiveUDFEvaluators.scala | 148 + .../scala/org/apache/spark/sql/hive/hiveUDFs.scala | 147 ++-- .../spark/sql/hive/execution/HiveUDFSuite.scala| 42 ++ 3 files changed, 232 insertions(+), 105 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala new file mode 100644 index 000..094f8ba7a0f --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, UDF} +import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper +import org.apache.spark.sql.types.DataType + +abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( +funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) + extends HiveInspectors with Serializable { + + @transient + lazy val function = funcWrapper.createFunction[UDFType]() + + @transient + lazy val isUDFDeterministic = { +val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) +udfType != null && udfType.deterministic() && !udfType.stateful() + } + + def returnType: DataType + + def setArg(index: Int, arg: Any): Unit + + def doEvaluate(): Any + + final def evaluate(): Any = { +try { + doEvaluate() +} catch { + case e: Throwable => +throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( + s"${funcWrapper.functionClassName}", + s"${children.map(_.dataType.catalogString).mkString(", ")}", + s"${returnType.catalogString}", + e) +} + } +} + +class HiveSimpleUDFEvaluator( +funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) + extends HiveUDFEvaluatorBase[UDF](funcWrapper, children) { + + @transient + lazy val method = function.getResolver. +getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) + + @transient + private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray + + @transient + private lazy val arguments = children.map(toInspector).toArray + + // Create parameter converters + @transient +
[spark] branch master updated: [SPARK-42786][CONNECT] Typed Select
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 01dc5e6abaf [SPARK-42786][CONNECT] Typed Select 01dc5e6abaf is described below commit 01dc5e6abafeb0bee2049d3e9da73fb89703d958 Author: Zhen Li AuthorDate: Tue Mar 21 23:34:08 2023 -0400 [SPARK-42786][CONNECT] Typed Select ### What changes were proposed in this pull request? Implement typed select methods in the Dataset. ### Why are the changes needed? More APIs ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit and E2E tests Closes #40413 from zhenlineo/select-typed. Authored-by: Zhen Li Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 79 ++-- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 31 +++- .../apache/spark/sql/PlanGenerationTestSuite.scala | 35 + .../CheckConnectJvmClientCompatibility.scala | 3 +- .../explain-results/select_typed_2-arg.explain | 2 + .../explain-results/select_typed_3-arg.explain | 2 + .../explain-results/select_typed_4-arg.explain | 2 + .../explain-results/select_typed_5-arg.explain | 2 + .../query-tests/queries/select_typed_2-arg.json| 42 +++ .../queries/select_typed_2-arg.proto.bin | Bin 0 -> 101 bytes .../query-tests/queries/select_typed_3-arg.json| 55 ++ .../queries/select_typed_3-arg.proto.bin | Bin 0 -> 128 bytes .../query-tests/queries/select_typed_4-arg.json| 68 + .../queries/select_typed_4-arg.proto.bin | Bin 0 -> 157 bytes .../query-tests/queries/select_typed_5-arg.json| 81 + .../queries/select_typed_5-arg.proto.bin | Bin 0 -> 182 bytes .../sql/catalyst/encoders/AgnosticEncoder.scala| 14 17 files changed, 407 insertions(+), 9 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index fdc994b2d90..cc9f66a8ba0 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal import org.apache.spark.annotation.DeveloperApi import org.apache.spark.connect.proto import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEncoder, StringEncoder, UnboundRowEncoder} +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEncoder, ProductEncoder, StringEncoder, UnboundRowEncoder} import org.apache.spark.sql.catalyst.expressions.RowOrdering import org.apache.spark.sql.connect.client.SparkResult import org.apache.spark.sql.connect.common.DataTypeProtoConverter @@ -1020,11 +1020,8 @@ class Dataset[T] private[sql] ( * @since 3.4.0 */ @scala.annotation.varargs - def select(cols: Column*): DataFrame = sparkSession.newDataFrame { builder => -builder.getProjectBuilder - .setInput(plan.getRoot) - .addAllExpressions(cols.map(_.expr).asJava) - } + def select(cols: Column*): DataFrame = +selectUntyped(UnboundRowEncoder, cols).asInstanceOf[DataFrame] /** * Selects a set of columns. This is a variant of `select` that can only select existing columns @@ -1084,6 +1081,76 @@ class Dataset[T] private[sql] ( } } + /** + * Internal helper function for building typed selects that return tuples. For simplicity and + * code reuse, we do this without the help of the type system and then use helper functions that + * cast appropriately for the user facing interface. + */ + private def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = { +val encoder = ProductEncoder.tuple(columns.map(_.encoder)) +selectUntyped(encoder, columns) + } + + /** + * Internal helper function for all select methods. The only difference between the select + * methods and typed select methods is the encoder used to build the return dataset. + */ + private def selectUntyped(encoder: AgnosticEncoder[_], cols: Seq[Column]): Dataset[_] = { +sparkSession.newDataset(encoder) { builder => + builder.getProjectBuilder +.setInput(plan.getRoot) +.addAllExpressions(cols.map(_.expr).asJava) +} + } + + /** + * Returns a new Dataset by computing the given [[Column]] expressions for each element. + * + * @group typedrel + * @since 3.4.0 + */ + def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] = +selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]]
[spark] branch master updated (76e4425efc2 -> 8b436b3ab00)
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 76e4425efc2 [SPARK-42753] ReusedExchange refers to non-existent nodes add 8b436b3ab00 [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks No new revisions were added by this update. Summary of changes: .../org/apache/spark/scheduler/DAGScheduler.scala | 12 +- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 212 + 2 files changed, 220 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42753] ReusedExchange refers to non-existent nodes
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 76e4425efc2 [SPARK-42753] ReusedExchange refers to non-existent nodes 76e4425efc2 is described below commit 76e4425efc22218fba04ad0aba8e6c1f6bb4954a Author: Steven Chen AuthorDate: Wed Mar 22 09:28:18 2023 +0800 [SPARK-42753] ReusedExchange refers to non-existent nodes ### What changes were proposed in this pull request? This PR addresses a rare bug with the EXPLAIN function and Spark UI that can happen when AQE takes effect with multiple ReusedExchange nodes. The bug causes the ReusedExchange to point to an unknown child since that child subtree was "pruned" in a previous AQE iteration. This PR fixes the issue by finding all the ReusedExchange nodes in the tree that have a `child` node that has NOT been processed in the final plan (meaning it has no ID or it has an incorrect ID generated from the previous AQE iteration). It then traverses the child subtree and generates correct IDs for them. We print this missing subtree in a new section called `Adaptively Optimized Out Exchanges`. ### Why are the changes needed? Below is an example to demonstrate the root cause: > AdaptiveSparkPlan > |-- SomeNode X (subquery xxx) > |-- Exchange A > |-- SomeNode Y > |-- Exchange B > > Subquery:Hosting operator = SomeNode Hosting Expression = xxx dynamicpruning#388 > AdaptiveSparkPlan > |-- SomeNode M > |-- Exchange C > |-- SomeNode N > |-- Exchange D > Step 1: Exchange B is materialized and the QueryStage is added to stage cache Step 2: Exchange D reuses Exchange B Step 3: Exchange C is materialized and the QueryStage is added to stage cache Step 4: Exchange A reuses Exchange C Then the final plan looks like: > AdaptiveSparkPlan > |-- SomeNode X (subquery xxx) > |-- Exchange A -> ReusedExchange (reuses Exchange C) > > > Subquery:Hosting operator = SomeNode Hosting Expression = xxx dynamicpruning#388 > AdaptiveSparkPlan > |-- SomeNode M > |-- Exchange C -> PhotonShuffleMapStage > |-- SomeNode N > |-- Exchange D -> ReusedExchange (reuses Exchange B) > As a result, the ReusedExchange (reuses Exchange B) will refer to a non-exist node. ### Does this PR introduce _any_ user-facing change? **Explain Text Before and After** **Before:** ``` +- ReusedExchange (105) (105) ReusedExchange [Reuses operator id: unknown] Output [3]: [sr_customer_sk#303, sr_store_sk#307, sum#413L] ``` **After:** ``` +- ReusedExchange (105) +- Exchange (132) +- * HashAggregate (131) +- * Project (130) +- * BroadcastHashJoin Inner BuildRight (129) :- * Filter (128) : +- * ColumnarToRow (127) : +- Scan parquet hive_metastore.tpcds_sf1000_delta.store_returns (126) +- ShuffleQueryStage (115), Statistics(sizeInBytes=5.7 KiB, rowCount=366, [d_date_sk#234 -> ColumnStat(Some(362),Some(2415022),Some(2488070),Some(0),Some(4),Some(4),None,2)], isRuntime=true) +- ReusedExchange (114) (105) ReusedExchange [Reuses operator id: 132] Output [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L] (126) Scan parquet hive_metastore.tpcds_sf1000_delta.store_returns Output [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, sr_returned_date_sk#214] Batched: true Location: PreparedDeltaFileIndex [dbfs:/mnt/performance-datasets/2018TPC/tpcds-2.4/sf1000_delta/store_returns] PartitionFilters: [isnotnull(sr_returned_date_sk#214), dynamicpruningexpression(sr_returned_date_sk#214 IN dynamicpruning#329)] PushedFilters: [IsNotNull(sr_store_sk)] ReadSchema: struct (127) ColumnarToRow Input [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, sr_returned_date_sk#214] (128) Filter Input [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, sr_returned_date_sk#214] Condition : isnotnull(sr_store_sk#221) (114) ReusedExchange [Reuses operator id: 8] Output [1]: [d_date_sk#234] (115) ShuffleQueryStage Output [1]: [d_date_sk#234] Arguments: 2, Statistics(sizeInBytes=5.7 KiB, rowCount=366, [d_date_sk#234 -> ColumnStat(Some(362),Some(2415022),Some(2488070),Some(0),Some(4),Some(4),None,2)], isRuntime=true) (129) BroadcastHashJoin Left keys [1]: [sr_returned_date_sk#214] Right keys [1]: [d_date_sk#234] Join type: Inner Join condition: None
[spark] branch master updated: [MINOR][DOCS] Fix typos
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7f45285c821 [MINOR][DOCS] Fix typos 7f45285c821 is described below commit 7f45285c8217429cced2854cdf98512bb3208de4 Author: sudoliyang AuthorDate: Wed Mar 22 10:20:13 2023 +0900 [MINOR][DOCS] Fix typos ### What changes were proposed in this pull request? Fix typos in the repo. ### Why are the changes needed? Improve readability. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No tests are needed. Closes #40494 from sudoliyang/fix_typos. Authored-by: sudoliyang Signed-off-by: Hyukjin Kwon --- R/pkg/R/functions.R | 2 +- .../src/main/java/org/apache/spark/network/util/NettyUtils.java | 2 +- core/src/main/scala/org/apache/spark/ui/UIUtils.scala | 2 +- docs/sql-data-sources-jdbc.md | 2 +- docs/sql-migration-guide.md | 2 +- docs/sql-ref-syntax-aux-conf-mgmt-reset.md | 2 +- .../main/scala/org/apache/spark/ml/classification/LinearSVC.scala | 2 +- .../org/apache/spark/ml/regression/AFTSurvivalRegression.scala | 2 +- .../scala/org/apache/spark/ml/regression/LinearRegression.scala | 2 +- mllib/core/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala | 4 ++-- python/pyspark/broadcast.py | 2 +- python/pyspark/ml/functions.py | 2 +- python/pyspark/ml/linalg/__init__.py| 2 +- python/pyspark/ml/stat.py | 4 ++-- python/pyspark/mllib/linalg/__init__.py | 2 +- python/pyspark/rdd.py | 2 +- python/pyspark/sql/functions.py | 6 +++--- python/pyspark/worker.py| 2 +- .../main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- .../spark/sql/execution/datasources/orc/OrcColumnStatistics.java| 2 +- .../org/apache/spark/sql/execution/streaming/AsyncCommitLog.scala | 2 +- .../apache/spark/sql/execution/streaming/AsyncOffsetSeqLog.scala| 2 +- 22 files changed, 26 insertions(+), 26 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 00ce630bd18..8fc5be17fde 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -259,7 +259,7 @@ NULL #' @param finish an unary \code{function} \code{(Column) -> Column} used to #' apply final transformation on the accumulated data in \code{array_aggregate}. #' @param comparator an optional binary (\code{(Column, Column) -> Column}) \code{function} -#' which is used to compare the elemnts of the array. +#' which is used to compare the elements of the array. #' The comparator will take two #' arguments representing two elements of the array. It returns a negative integer, #' 0, or a positive integer as the first element is less than, equal to, diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java index cc4657efe39..d8f720e98e3 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java @@ -181,7 +181,7 @@ public class NettyUtils { } /** - * ByteBuf allocator prefers to allocate direct ByteBuf iif both Spark allows to create direct + * ByteBuf allocator prefers to allocate direct ByteBuf if both Spark allows to create direct * ByteBuf and Netty enables directBufferPreferred. */ public static boolean preferDirectBufs(TransportConf conf) { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 4140de09f95..0ce647d12c5 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -141,7 +141,7 @@ private[spark] object UIUtils extends Logging { * * @param batchTime the batch time to be formatted * @param batchInterval the batch interval - * @param showMMSS if showing the `/MM/dd` part. If it's false, the return value wll be + * @param showMMSS if showing the `/MM/dd` part. If it's false, the return value will be * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval` * @param timezone only for test */ diff --git
[spark] branch branch-3.4 updated: [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 5622981c885 [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel 5622981c885 is described below commit 5622981c88542bd1508a09fd0b45bcfb0f1fc136 Author: Takuya UESHIN AuthorDate: Wed Mar 22 10:18:10 2023 +0900 [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel ### What changes were proposed in this pull request? Implements `DataFrame.cache`, `persist`, `unpersist`, and `storageLevel`. ### Why are the changes needed? Missing APIs. ### Does this PR introduce _any_ user-facing change? `DataFrame.cache`, `persist`, `unpersist`, and `storageLevel` will be available. ### How was this patch tested? Added/enabled the related tests. Closes #40510 from ueshin/issues/SPARK-42889/cache. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon (cherry picked from commit b4f02248972c357cc2af6881b10565315ea15cb4) Signed-off-by: Hyukjin Kwon --- .../src/main/protobuf/spark/connect/base.proto | 50 .../common/StorageLevelProtoConverter.scala| 46 .../service/SparkConnectAnalyzeHandler.scala | 33 ++- python/pyspark/sql/connect/client.py | 36 +++ python/pyspark/sql/connect/dataframe.py| 56 +++- python/pyspark/sql/connect/proto/base_pb2.py | 298 ++--- python/pyspark/sql/connect/proto/base_pb2.pyi | 228 python/pyspark/sql/dataframe.py| 20 ++ .../sql/tests/connect/test_connect_basic.py| 3 - python/pyspark/sql/tests/test_dataframe.py | 26 +- python/pyspark/storagelevel.py | 14 +- 11 files changed, 692 insertions(+), 118 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index da0f974a749..591f32cea1b 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -54,6 +54,20 @@ message UserContext { repeated google.protobuf.Any extensions = 999; } +// StorageLevel for persisting Datasets/Tables. +message StorageLevel { + // (Required) Whether the cache should use disk or not. + bool use_disk = 1; + // (Required) Whether the cache should use memory or not. + bool use_memory = 2; + // (Required) Whether the cache should use off-heap or not. + bool use_off_heap = 3; + // (Required) Whether the cached data is deserialized or not. + bool deserialized = 4; + // (Required) The number of replicas. + int32 replication = 5; +} + // Request to perform plan analyze, optionally to explain the plan. message AnalyzePlanRequest { // (Required) @@ -82,6 +96,9 @@ message AnalyzePlanRequest { DDLParse ddl_parse = 11; SameSemantics same_semantics = 12; SemanticHash semantic_hash = 13; +Persist persist = 14; +Unpersist unpersist = 15; +GetStorageLevel get_storage_level = 16; } message Schema { @@ -163,6 +180,27 @@ message AnalyzePlanRequest { // (Required) The logical plan to get a hashCode. Plan plan = 1; } + + message Persist { +// (Required) The logical plan to persist. +Relation relation = 1; + +// (Optional) The storage level. +optional StorageLevel storage_level = 2; + } + + message Unpersist { +// (Required) The logical plan to unpersist. +Relation relation = 1; + +// (Optional) Whether to block until all blocks are deleted. +optional bool blocking = 2; + } + + message GetStorageLevel { +// (Required) The logical plan to get the storage level. +Relation relation = 1; + } } // Response to performing analysis of the query. Contains relevant metadata to be able to @@ -181,6 +219,9 @@ message AnalyzePlanResponse { DDLParse ddl_parse = 9; SameSemantics same_semantics = 10; SemanticHash semantic_hash = 11; +Persist persist = 12; +Unpersist unpersist = 13; +GetStorageLevel get_storage_level = 14; } message Schema { @@ -223,6 +264,15 @@ message AnalyzePlanResponse { message SemanticHash { int32 result = 1; } + + message Persist { } + + message Unpersist { } + + message GetStorageLevel { +// (Required) The StorageLevel as a result of get_storage_level request. +StorageLevel storage_level = 1; + } } // A request to be executed by the service. diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala
[spark] branch master updated (da19e0de05b -> b4f02248972)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from da19e0de05b [SPARK-42816][CONNECT] Support Max Message size up to 128MB add b4f02248972 [SPARK-42889][CONNECT][PYTHON] Implement cache, persist, unpersist, and storageLevel No new revisions were added by this update. Summary of changes: .../src/main/protobuf/spark/connect/base.proto | 50 .../common/StorageLevelProtoConverter.scala| 31 ++- .../service/SparkConnectAnalyzeHandler.scala | 33 ++- python/pyspark/sql/connect/client.py | 36 +++ python/pyspark/sql/connect/dataframe.py| 56 +++- python/pyspark/sql/connect/proto/base_pb2.py | 298 ++--- python/pyspark/sql/connect/proto/base_pb2.pyi | 228 python/pyspark/sql/dataframe.py| 20 ++ .../sql/tests/connect/test_connect_basic.py| 3 - python/pyspark/sql/tests/test_dataframe.py | 26 +- python/pyspark/storagelevel.py | 14 +- 11 files changed, 671 insertions(+), 124 deletions(-) copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ColumnUtils.scala => connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/StorageLevelProtoConverter.scala (53%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42816][CONNECT] Support Max Message size up to 128MB
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 61b85aeabe5 [SPARK-42816][CONNECT] Support Max Message size up to 128MB 61b85aeabe5 is described below commit 61b85aeabe50453043b0b206d954b3018b134f6a Author: Martin Grund AuthorDate: Tue Mar 21 18:12:57 2023 -0700 [SPARK-42816][CONNECT] Support Max Message size up to 128MB ### What changes were proposed in this pull request? This change lifts the default message size of 4MB to 128MB and makes it configurable. While 128MB is a "random number" it supports creating DataFrames from reasonably sized local data without failing. ### Why are the changes needed? Usability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual Closes #40447 from grundprinzip/SPARK-42816. Lead-authored-by: Martin Grund Co-authored-by: Martin Grund Signed-off-by: Dongjoon Hyun (cherry picked from commit da19e0de05b4fbccae2c21385e67256ff31b1f1a) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/connect/client/SparkConnectClient.scala | 1 + .../spark/sql/connect/common/config/ConnectCommon.scala | 1 + .../scala/org/apache/spark/sql/connect/config/Connect.scala | 8 .../spark/sql/connect/service/SparkConnectService.scala | 3 ++- docs/configuration.md| 8 python/pyspark/sql/connect/client.py | 12 +++- python/pyspark/sql/tests/connect/test_connect_basic.py | 9 + 7 files changed, 40 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 60d6d202ff5..a298c526883 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -406,6 +406,7 @@ private[sql] object SparkConnectClient { if (metadata.nonEmpty) { channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata)) } + channelBuilder.maxInboundMessageSize(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) new SparkConnectClient( userContextBuilder.build(), channelBuilder, diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala index 48ae4d2d77f..3f594d79b62 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala @@ -18,4 +18,5 @@ package org.apache.spark.sql.connect.common.config private[connect] object ConnectCommon { val CONNECT_GRPC_BINDING_PORT: Int = 15002 + val CONNECT_GRPC_MAX_MESSAGE_SIZE: Int = 128 * 1024 * 1024; } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 64b5bd5d813..19fdad97b5f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -47,6 +47,14 @@ object Connect { .bytesConf(ByteUnit.MiB) .createWithDefaultString("4m") + val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE = +ConfigBuilder("spark.connect.grpc.maxInboundMessageSize") + .doc("Sets the maximum inbound message in bytes size for the gRPC requests." + +"Requests with a larger payload will fail.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) + val CONNECT_EXTENSIONS_RELATION_CLASSES = ConfigBuilder("spark.connect.extensions.relation.classes") .doc(""" diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index cd353b6ff60..a9442a8c92c 100755 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -42,7 +42,7 @@ import org.apache.spark.connect.proto import
[spark] branch master updated: [SPARK-42816][CONNECT] Support Max Message size up to 128MB
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new da19e0de05b [SPARK-42816][CONNECT] Support Max Message size up to 128MB da19e0de05b is described below commit da19e0de05b4fbccae2c21385e67256ff31b1f1a Author: Martin Grund AuthorDate: Tue Mar 21 18:12:57 2023 -0700 [SPARK-42816][CONNECT] Support Max Message size up to 128MB ### What changes were proposed in this pull request? This change lifts the default message size of 4MB to 128MB and makes it configurable. While 128MB is a "random number" it supports creating DataFrames from reasonably sized local data without failing. ### Why are the changes needed? Usability ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual Closes #40447 from grundprinzip/SPARK-42816. Lead-authored-by: Martin Grund Co-authored-by: Martin Grund Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/connect/client/SparkConnectClient.scala | 1 + .../spark/sql/connect/common/config/ConnectCommon.scala | 1 + .../scala/org/apache/spark/sql/connect/config/Connect.scala | 8 .../spark/sql/connect/service/SparkConnectService.scala | 3 ++- docs/configuration.md| 8 python/pyspark/sql/connect/client.py | 12 +++- python/pyspark/sql/tests/connect/test_connect_basic.py | 9 + 7 files changed, 40 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 60d6d202ff5..a298c526883 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -406,6 +406,7 @@ private[sql] object SparkConnectClient { if (metadata.nonEmpty) { channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata)) } + channelBuilder.maxInboundMessageSize(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) new SparkConnectClient( userContextBuilder.build(), channelBuilder, diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala index 48ae4d2d77f..3f594d79b62 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/config/ConnectCommon.scala @@ -18,4 +18,5 @@ package org.apache.spark.sql.connect.common.config private[connect] object ConnectCommon { val CONNECT_GRPC_BINDING_PORT: Int = 15002 + val CONNECT_GRPC_MAX_MESSAGE_SIZE: Int = 128 * 1024 * 1024; } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 64b5bd5d813..19fdad97b5f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -47,6 +47,14 @@ object Connect { .bytesConf(ByteUnit.MiB) .createWithDefaultString("4m") + val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE = +ConfigBuilder("spark.connect.grpc.maxInboundMessageSize") + .doc("Sets the maximum inbound message in bytes size for the gRPC requests." + +"Requests with a larger payload will fail.") + .version("3.4.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE) + val CONNECT_EXTENSIONS_RELATION_CLASSES = ConfigBuilder("spark.connect.extensions.relation.classes") .doc(""" diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index cd353b6ff60..a9442a8c92c 100755 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -42,7 +42,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse} import org.apache.spark.internal.Logging import
[spark] branch branch-3.4 updated: [SPARK-42888][BUILD] Upgrade `gcs-connector` to 2.2.11
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new ca260cccb15 [SPARK-42888][BUILD] Upgrade `gcs-connector` to 2.2.11 ca260cccb15 is described below commit ca260cccb15d5c28b0a25cf0423723700d343d3c Author: Chris Nauroth AuthorDate: Tue Mar 21 18:00:35 2023 -0700 [SPARK-42888][BUILD] Upgrade `gcs-connector` to 2.2.11 ### What changes were proposed in this pull request? Upgrade the [GCS Connector](https://github.com/GoogleCloudDataproc/hadoop-connectors/tree/v2.2.11/gcs) bundled in the Spark distro from version 2.2.7 to 2.2.11. ### Why are the changes needed? The new release contains multiple bug fixes and enhancements discussed in the [Release Notes](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.11/gcs/CHANGES.md). Notable changes include: * Improved socket timeout handling. * Trace logging capabilities. * Fix bug that prevented usage of GCS as a [Hadoop Credential Provider](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html). * Dependency upgrades. * Support OAuth2 based client authentication. ### Does this PR introduce _any_ user-facing change? Distributions built with `-Phadoop-cloud` now include GCS connector 2.2.11 instead of 2.2.7. ``` cnaurothcnauroth-2-1-m:~/spark-3.5.0-SNAPSHOT-bin-custom-spark$ ls -lrt jars/gcs* -rw-r--r-- 1 cnauroth cnauroth 36497606 Mar 21 00:42 jars/gcs-connector-hadoop3-2.2.11-shaded.jar ``` ### How was this patch tested? **Build** I built a custom distro with `-Phadoop-cloud`: ``` ./dev/make-distribution.sh --name custom-spark --pip --tgz -Phadoop-3 -Phadoop-cloud -Pscala-2.12 ``` **Run** I ran a PySpark job that successfully reads and writes using GCS: ``` from pyspark.sql import SparkSession def main() -> None: # Create SparkSession. spark = (SparkSession.builder .appName('copy-shakespeare') .getOrCreate()) # Read. df = spark.read.text('gs://dataproc-datasets-us-central1/shakespeare') # Write. df.write.text('gs://cnauroth-hive-metastore-proxy-dist/output/copy-shakespeare') spark.stop() if __name__ == '__main__': main() ``` Authored-by: Chris Nauroth Closes #40511 from cnauroth/SPARK-42888. Authored-by: Chris Nauroth Signed-off-by: Dongjoon Hyun (cherry picked from commit f9017cbe521f7696128b8c9edcb825c79f16768b) Signed-off-by: Dongjoon Hyun --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- pom.xml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 01e6c814fea..ec382099d24 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -63,7 +63,7 @@ datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar -gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar +gcs-connector/hadoop2-2.2.11/shaded/gcs-connector-hadoop2-2.2.11-shaded.jar gmetric4j/1.0.10//gmetric4j-1.0.10.jar gson/2.2.4//gson-2.2.4.jar guava/14.0.1//guava-14.0.1.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 4fd81b5f43b..4c834bed077 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -60,7 +60,7 @@ datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar derby/10.14.2.0//derby-10.14.2.0.jar dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar -gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar +gcs-connector/hadoop3-2.2.11/shaded/gcs-connector-hadoop3-2.2.11-shaded.jar gmetric4j/1.0.10//gmetric4j-1.0.10.jar gson/2.2.4//gson-2.2.4.jar guava/14.0.1//guava-14.0.1.jar diff --git a/pom.xml b/pom.xml index 6d94e06ca78..4f1f93f3492 100644 --- a/pom.xml +++ b/pom.xml @@ -160,7 +160,7 @@ 1.11.655 0.12.8 -hadoop3-2.2.7 +hadoop3-2.2.11 4.5.14 4.4.16 @@ -3514,7 +3514,7 @@ hadoop-client hadoop-yarn-api hadoop-client -hadoop2-2.2.7 +hadoop2-2.2.11 4.3.0 - To unsubscribe,
[spark] branch master updated (f8c0a4a2f24 -> f9017cbe521)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f8c0a4a2f24 [SPARK-42871][BUILD] Upgrade slf4j to 2.0.7 add f9017cbe521 [SPARK-42888][BUILD] Upgrade `gcs-connector` to 2.2.11 No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- pom.xml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2c962a8d29a -> f8c0a4a2f24)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 2c962a8d29a [SPARK-42662][CONNECT][PS] Add proto message for pandas API on Spark default index add f8c0a4a2f24 [SPARK-42871][BUILD] Upgrade slf4j to 2.0.7 No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2-hive-2.3 | 6 +++--- dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++--- pom.xml | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (54afa2bfffb -> 2c962a8d29a)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 54afa2bfffb [MINOR][DOCS] Remove SparkSession constructor invocation in the example add 2c962a8d29a [SPARK-42662][CONNECT][PS] Add proto message for pandas API on Spark default index No new revisions were added by this update. Summary of changes: .../main/protobuf/spark/connect/expressions.proto | 5 + .../sql/connect/planner/SparkConnectPlanner.scala | 7 ++ .../connect/planner/SparkConnectPlannerSuite.scala | 22 +++- python/pyspark/sql/connect/expressions.py | 10 ++ .../pyspark/sql/connect/proto/expressions_pb2.py | 121 - .../pyspark/sql/connect/proto/expressions_pb2.pyi | 18 +++ .../sql/tests/connect/test_connect_column.py | 10 +- 7 files changed, 137 insertions(+), 56 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [MINOR][DOCS] Remove SparkSession constructor invocation in the example
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new d123bc6b09c [MINOR][DOCS] Remove SparkSession constructor invocation in the example d123bc6b09c is described below commit d123bc6b09cfc4f13c3a80ce575eb96c3170f6d4 Author: Hyukjin Kwon AuthorDate: Wed Mar 22 09:19:48 2023 +0900 [MINOR][DOCS] Remove SparkSession constructor invocation in the example ### What changes were proposed in this pull request? This PR proposes to Remove SparkSession constructor invocation in the example. While I am here, I piggyback and add an example of Spark Connect. ### Why are the changes needed? SparkSession's constructor is not meant to be exposed to the end users. This is also hidden in Scala side. ### Does this PR introduce _any_ user-facing change? Yes, it removes the usage of SparkSession constructor in the user facing docs. ### How was this patch tested? Linters should verify the changes in the CI. Closes #40505 from HyukjinKwon/minor-docs-session. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 54afa2bfffb46d80cbe9f815a37c3d3325648ef3) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/session.py | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 3b134b551c3..da234abf927 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -179,10 +179,15 @@ class SparkSession(SparkConversionMixin): ... .getOrCreate() ... ) -Create a Spark session from a Spark context. +Create a Spark session with Spark Connect. ->>> sc = spark.sparkContext ->>> spark = SparkSession(sc) +>>> spark = ( +... SparkSession.builder +... .remote("sc://localhost") +... .appName("Word Count") +... .config("spark.some.config.option", "some-value") +... .getOrCreate() +... ) # doctest: +SKIP """ class Builder: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e1e66c570ac -> 54afa2bfffb)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e1e66c570ac [SPARK-42885][K8S][BUILD] Upgrade `kubernetes-client` to 6.5.1 add 54afa2bfffb [MINOR][DOCS] Remove SparkSession constructor invocation in the example No new revisions were added by this update. Summary of changes: python/pyspark/sql/session.py | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42885][K8S][BUILD] Upgrade `kubernetes-client` to 6.5.1
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e1e66c570ac [SPARK-42885][K8S][BUILD] Upgrade `kubernetes-client` to 6.5.1 e1e66c570ac is described below commit e1e66c570ac5cb73e7c0b9ffe82974349619e9a0 Author: Dongjoon Hyun AuthorDate: Tue Mar 21 13:53:16 2023 -0700 [SPARK-42885][K8S][BUILD] Upgrade `kubernetes-client` to 6.5.1 ### What changes were proposed in this pull request? This PR aims to upgrade `kubernetes-client` to 6.5.1. ### Why are the changes needed? To bring the latest bug fixes. - https://github.com/fabric8io/kubernetes-client/releases/tag/v6.5.1 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs Closes #40509 from dongjoon-hyun/SPARK-42885. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 50 +-- dev/deps/spark-deps-hadoop-3-hive-2.3 | 50 +-- pom.xml | 2 +- 3 files changed, 51 insertions(+), 51 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index df04d79969d..31bd0b576bf 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -159,31 +159,31 @@ jsr305/3.0.0//jsr305-3.0.0.jar jta/1.1//jta-1.1.jar jul-to-slf4j/2.0.6//jul-to-slf4j-2.0.6.jar kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar -kubernetes-client-api/6.5.0//kubernetes-client-api-6.5.0.jar -kubernetes-client/6.5.0//kubernetes-client-6.5.0.jar -kubernetes-httpclient-okhttp/6.5.0//kubernetes-httpclient-okhttp-6.5.0.jar -kubernetes-model-admissionregistration/6.5.0//kubernetes-model-admissionregistration-6.5.0.jar -kubernetes-model-apiextensions/6.5.0//kubernetes-model-apiextensions-6.5.0.jar -kubernetes-model-apps/6.5.0//kubernetes-model-apps-6.5.0.jar -kubernetes-model-autoscaling/6.5.0//kubernetes-model-autoscaling-6.5.0.jar -kubernetes-model-batch/6.5.0//kubernetes-model-batch-6.5.0.jar -kubernetes-model-certificates/6.5.0//kubernetes-model-certificates-6.5.0.jar -kubernetes-model-common/6.5.0//kubernetes-model-common-6.5.0.jar -kubernetes-model-coordination/6.5.0//kubernetes-model-coordination-6.5.0.jar -kubernetes-model-core/6.5.0//kubernetes-model-core-6.5.0.jar -kubernetes-model-discovery/6.5.0//kubernetes-model-discovery-6.5.0.jar -kubernetes-model-events/6.5.0//kubernetes-model-events-6.5.0.jar -kubernetes-model-extensions/6.5.0//kubernetes-model-extensions-6.5.0.jar -kubernetes-model-flowcontrol/6.5.0//kubernetes-model-flowcontrol-6.5.0.jar -kubernetes-model-gatewayapi/6.5.0//kubernetes-model-gatewayapi-6.5.0.jar -kubernetes-model-metrics/6.5.0//kubernetes-model-metrics-6.5.0.jar -kubernetes-model-networking/6.5.0//kubernetes-model-networking-6.5.0.jar -kubernetes-model-node/6.5.0//kubernetes-model-node-6.5.0.jar -kubernetes-model-policy/6.5.0//kubernetes-model-policy-6.5.0.jar -kubernetes-model-rbac/6.5.0//kubernetes-model-rbac-6.5.0.jar -kubernetes-model-resource/6.5.0//kubernetes-model-resource-6.5.0.jar -kubernetes-model-scheduling/6.5.0//kubernetes-model-scheduling-6.5.0.jar -kubernetes-model-storageclass/6.5.0//kubernetes-model-storageclass-6.5.0.jar +kubernetes-client-api/6.5.1//kubernetes-client-api-6.5.1.jar +kubernetes-client/6.5.1//kubernetes-client-6.5.1.jar +kubernetes-httpclient-okhttp/6.5.1//kubernetes-httpclient-okhttp-6.5.1.jar +kubernetes-model-admissionregistration/6.5.1//kubernetes-model-admissionregistration-6.5.1.jar +kubernetes-model-apiextensions/6.5.1//kubernetes-model-apiextensions-6.5.1.jar +kubernetes-model-apps/6.5.1//kubernetes-model-apps-6.5.1.jar +kubernetes-model-autoscaling/6.5.1//kubernetes-model-autoscaling-6.5.1.jar +kubernetes-model-batch/6.5.1//kubernetes-model-batch-6.5.1.jar +kubernetes-model-certificates/6.5.1//kubernetes-model-certificates-6.5.1.jar +kubernetes-model-common/6.5.1//kubernetes-model-common-6.5.1.jar +kubernetes-model-coordination/6.5.1//kubernetes-model-coordination-6.5.1.jar +kubernetes-model-core/6.5.1//kubernetes-model-core-6.5.1.jar +kubernetes-model-discovery/6.5.1//kubernetes-model-discovery-6.5.1.jar +kubernetes-model-events/6.5.1//kubernetes-model-events-6.5.1.jar +kubernetes-model-extensions/6.5.1//kubernetes-model-extensions-6.5.1.jar +kubernetes-model-flowcontrol/6.5.1//kubernetes-model-flowcontrol-6.5.1.jar +kubernetes-model-gatewayapi/6.5.1//kubernetes-model-gatewayapi-6.5.1.jar +kubernetes-model-metrics/6.5.1//kubernetes-model-metrics-6.5.1.jar +kubernetes-model-networking/6.5.1//kubernetes-model-networking-6.5.1.jar +kubernetes-model-node/6.5.1//kubernetes-model-node-6.5.1.jar +kubernetes-model-policy/6.5.1//kubernetes-model-policy-6.5.1.jar
[spark] branch master updated: [SPARK-42813][K8S] Print application info when waitAppCompletion is false
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ba1badd2adf [SPARK-42813][K8S] Print application info when waitAppCompletion is false ba1badd2adf is described below commit ba1badd2adfd175c6680dff90e14b8aaa6cecd78 Author: Cheng Pan AuthorDate: Tue Mar 21 09:08:18 2023 -0700 [SPARK-42813][K8S] Print application info when waitAppCompletion is false ### What changes were proposed in this pull request? On K8s cluster mode, 1. when `spark.kubernetes.submission.waitAppCompletion=false`, print the application information on `spark-submit` exit, as it did before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174) 2. add `appId` in the output message ### Why are the changes needed? On K8s cluster mode, when `spark.kubernetes.submission.waitAppCompletion=false`, before [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), the `spark-submit` will exit quickly w/ the basic application information. ``` logInfo(s"Deployed Spark application ${conf.appName} with submission ID $sId into Kubernetes") ``` After [SPARK-35174](https://issues.apache.org/jira/browse/SPARK-35174), those part of code is unreachable, so nothing is output. This PR also proposes to add `appId` in the output message, to make it consistent w/ the context (if you look at the `LoggingPodStatusWatcherImpl`, this is kind of an exception, `... application $appId ...` is used in other places), and YARN. https://github.com/apache/spark/blob/8860f69455e5a722626194c4797b4b42cccd4510/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L1311-L1318 ### Does this PR introduce _any_ user-facing change? Yes, changes contain 1) when `spark.kubernetes.submission.waitAppCompletion=false`, the user can see the app information when `spark-submit` exit. 2) the end of `spark-submit` information contains app id now, which is consistent w/ the context and other resource managers like YARN. ### How was this patch tested? Pass CI. Closes #40444 from pan3793/SPARK-42813. Authored-by: Cheng Pan Signed-off-by: Dongjoon Hyun --- .../k8s/submit/KubernetesClientApplication.scala | 9 +- .../k8s/submit/LoggingPodStatusWatcher.scala | 12 .../spark/deploy/k8s/submit/ClientSuite.scala | 32 -- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 14d3c4d1f42..9f9b5655e26 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -180,8 +180,8 @@ private[spark] class Client( throw e } +val sId = Client.submissionId(conf.namespace, driverPodName) if (conf.get(WAIT_FOR_APP_COMPLETION)) { - val sId = Seq(conf.namespace, driverPodName).mkString(":") breakable { while (true) { val podWithName = kubernetesClient @@ -202,10 +202,17 @@ private[spark] class Client( } } } +} else { + logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " + +s"and submission ID $sId into Kubernetes") } } } +private[spark] object Client { + def submissionId(namespace: String, driverPodName: String): String = s"$namespace:$driverPodName" +} + /** * Main class and entry point of application submission in KUBERNETES mode. */ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 81d91457253..bc8b023b5ec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -95,8 +95,9 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) this.notifyAll() } - override def watchOrStop(sId: String): Boolean = if (conf.get(WAIT_FOR_APP_COMPLETION)) { -logInfo(s"Waiting for application ${conf.appName} with submission ID $sId to finish...") + override def watchOrStop(sId: String): Boolean = { +logInfo(s"Waiting for application
[spark] branch master updated: [SPARK-42808][CORE] Avoid getting availableProcessors every time in `MapOutputTrackerMaster#getStatistics`
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c04e0de0733 [SPARK-42808][CORE] Avoid getting availableProcessors every time in `MapOutputTrackerMaster#getStatistics` c04e0de0733 is described below commit c04e0de073354458f89d30733134a004fe2a25bd Author: sychen AuthorDate: Tue Mar 21 09:57:06 2023 -0500 [SPARK-42808][CORE] Avoid getting availableProcessors every time in `MapOutputTrackerMaster#getStatistics` ### What changes were proposed in this pull request? The return value of `Runtime.getRuntime.availableProcessors` is generally a fixed value. It is not necessary to obtain it every time `getStatistics` is called to avoid a native method call. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? exist UT Closes #40440 from cxzl25/SPARK-42808. Authored-by: sychen Signed-off-by: Sean Owen --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 5772285a63d..5ad62159d24 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -697,6 +697,8 @@ private[spark] class MapOutputTrackerMaster( pool } + private val availableProcessors = Runtime.getRuntime.availableProcessors() + // Make sure that we aren't going to exceed the max RPC message size by making sure // we use broadcast to send large map output statuses. if (minSizeForBroadcast > maxRpcMessageSize) { @@ -966,7 +968,7 @@ private[spark] class MapOutputTrackerMaster( val parallelAggThreshold = conf.get( SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) val parallelism = math.min( -Runtime.getRuntime.availableProcessors(), +availableProcessors, statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt if (parallelism <= 1) { statuses.filter(_ != null).foreach { s => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42536][BUILD] Upgrade log4j2 to 2.20.0
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new df2e2516188 [SPARK-42536][BUILD] Upgrade log4j2 to 2.20.0 df2e2516188 is described below commit df2e2516188b46537349aa7a5f279de6141c6450 Author: yangjie01 AuthorDate: Tue Mar 21 09:45:32 2023 -0500 [SPARK-42536][BUILD] Upgrade log4j2 to 2.20.0 ### What changes were proposed in this pull request? This version aims upgrade log4j2 from 2.19.0 to 2.20.0 ### Why are the changes needed? This version brings some bug fix like [Fix java.sql.Time object formatting in MapMessage ](https://issues.apache.org/jira/browse/LOG4J2-2297) and [Fix level propagation in Log4jBridgeHandler](https://issues.apache.org/jira/browse/LOG4J2-3634), and some new support like [Add support for timezones in RollingFileAppender](https://issues.apache.org/jira/browse/LOG4J2-1631), the release notes as follows: - https://logging.apache.org/log4j/2.x/release-notes/2.20.0.html ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #40490 from LuciferYang/SPARK-42536. Lead-authored-by: yangjie01 Co-authored-by: YangJie Signed-off-by: Sean Owen --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 8 dev/deps/spark-deps-hadoop-3-hive-2.3 | 8 pom.xml | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index e3d588d36cd..df04d79969d 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -188,10 +188,10 @@ lapack/3.0.3//lapack-3.0.3.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar -log4j-1.2-api/2.19.0//log4j-1.2-api-2.19.0.jar -log4j-api/2.19.0//log4j-api-2.19.0.jar -log4j-core/2.19.0//log4j-core-2.19.0.jar -log4j-slf4j2-impl/2.19.0//log4j-slf4j2-impl-2.19.0.jar +log4j-1.2-api/2.20.0//log4j-1.2-api-2.20.0.jar +log4j-api/2.20.0//log4j-api-2.20.0.jar +log4j-core/2.20.0//log4j-core-2.20.0.jar +log4j-slf4j2-impl/2.20.0//log4j-slf4j2-impl-2.20.0.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar mesos/1.4.3/shaded-protobuf/mesos-1.4.3-shaded-protobuf.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index fd32245ec28..c8d83233bcc 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -172,10 +172,10 @@ lapack/3.0.3//lapack-3.0.3.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar libfb303/0.9.3//libfb303-0.9.3.jar libthrift/0.12.0//libthrift-0.12.0.jar -log4j-1.2-api/2.19.0//log4j-1.2-api-2.19.0.jar -log4j-api/2.19.0//log4j-api-2.19.0.jar -log4j-core/2.19.0//log4j-core-2.19.0.jar -log4j-slf4j2-impl/2.19.0//log4j-slf4j2-impl-2.19.0.jar +log4j-1.2-api/2.20.0//log4j-1.2-api-2.20.0.jar +log4j-api/2.20.0//log4j-api-2.20.0.jar +log4j-core/2.20.0//log4j-core-2.20.0.jar +log4j-slf4j2-impl/2.20.0//log4j-slf4j2-impl-2.20.0.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar mesos/1.4.3/shaded-protobuf/mesos-1.4.3-shaded-protobuf.jar diff --git a/pom.xml b/pom.xml index 86f6435ee86..61fe9d23b2a 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ 1.6.0 spark 2.0.6 -2.19.0 +2.20.0 3.3.4 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42851][SQL] Guard EquivalentExpressions.addExpr() with supportedExpression()
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 6d5414f2672 [SPARK-42851][SQL] Guard EquivalentExpressions.addExpr() with supportedExpression() 6d5414f2672 is described below commit 6d5414f2672d7fd9a0c8ffe36feef6b3dfb60c74 Author: Kris Mok AuthorDate: Tue Mar 21 21:27:49 2023 +0800 [SPARK-42851][SQL] Guard EquivalentExpressions.addExpr() with supportedExpression() ### What changes were proposed in this pull request? In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`. ### Why are the changes needed? This fixes a regression caused by https://github.com/apache/spark/pull/39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`. One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error). ### Does this PR introduce _any_ user-facing change? This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions. Example running the SQL command: ```sql select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2) ``` example error message before the fix: ``` java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3] ``` after the fix this error is gone. ### How was this patch tested? Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom. Closes #40473 from rednaxelafx/spark-42851. Authored-by: Kris Mok Signed-off-by: Wenchen Fan (cherry picked from commit ef0a76eeea30fabb04499908b04124464225f5fd) Signed-off-by: Wenchen Fan --- .../catalyst/expressions/EquivalentExpressions.scala | 6 +- .../expressions/SubexpressionEliminationSuite.scala| 18 +- .../org/apache/spark/sql/DataFrameAggregateSuite.scala | 7 +++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 3ffd9f9d887..f47391c0492 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -40,7 +40,11 @@ class EquivalentExpressions { * Returns true if there was already a matching expression. */ def addExpr(expr: Expression): Boolean = { -updateExprInMap(expr, equivalenceMap) +if (supportedExpression(expr)) { + updateExprInMap(expr, equivalenceMap) +} else { + false +} } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index b16629f59aa..44d8ea3a112 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType} +import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, ObjectType} class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHelper { test("Semantic equals and hash") { @@ -449,6 +449,22 @@ class SubexpressionEliminationSuite extends SparkFunSuite with ExpressionEvalHel assert(e2.getCommonSubexpressions.size == 1) assert(e2.getCommonSubexpressions.head == add) } + + test("SPARK-42851: Handle supportExpression consistently across add and get") { +val expr = { + val function =
[spark] branch master updated (c9a530e38e7 -> ef0a76eeea3)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c9a530e38e7 [SPARK-42876][SQL] DataType's physicalDataType should be private[sql] add ef0a76eeea3 [SPARK-42851][SQL] Guard EquivalentExpressions.addExpr() with supportedExpression() No new revisions were added by this update. Summary of changes: .../catalyst/expressions/EquivalentExpressions.scala | 6 +- .../expressions/SubexpressionEliminationSuite.scala| 18 +- .../org/apache/spark/sql/DataFrameAggregateSuite.scala | 7 +++ 3 files changed, 29 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42876][SQL] DataType's physicalDataType should be private[sql]
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 8cffa5c9c9f [SPARK-42876][SQL] DataType's physicalDataType should be private[sql] 8cffa5c9c9f is described below commit 8cffa5c9c9f83d6c0ba2e6d490c2e658e089303a Author: Rui Wang AuthorDate: Tue Mar 21 16:26:47 2023 +0800 [SPARK-42876][SQL] DataType's physicalDataType should be private[sql] ### What changes were proposed in this pull request? `physicalDataType` should not be a public API but be private[sql]. ### Why are the changes needed? This is to limit API scope to not expose unnecessary API to be public. ### Does this PR introduce _any_ user-facing change? No since we have not released Spark 3.4.0 yet. ### How was this patch tested? N/A Closes #40499 from amaliujia/change_scope_of_physical_data_type. Authored-by: Rui Wang Signed-off-by: Wenchen Fan (cherry picked from commit c9a530e38e7f4ff3a491245c1d3ecaa1755c87ad) Signed-off-by: Wenchen Fan --- sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/BinaryType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/BooleanType.scala| 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala | 2 +- .../main/scala/org/apache/spark/sql/types/CalendarIntervalType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/CharType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala | 2 +- .../main/scala/org/apache/spark/sql/types/DayTimeIntervalType.scala| 2 +- .../src/main/scala/org/apache/spark/sql/types/DecimalType.scala| 3 ++- .../src/main/scala/org/apache/spark/sql/types/DoubleType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/IntegerType.scala| 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/StringType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/TimestampType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/VarcharType.scala| 2 +- .../main/scala/org/apache/spark/sql/types/YearMonthIntervalType.scala | 2 +- 23 files changed, 24 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala index 3e5f447a762..9665385f046 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala @@ -91,7 +91,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT */ override def defaultSize: Int = 1 * elementType.defaultSize - override def physicalDataType: PhysicalDataType = + private[sql] override def physicalDataType: PhysicalDataType = PhysicalArrayType(elementType, containsNull) override def simpleString: String = s"array<${elementType.simpleString}>" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala index d2998f533de..cba437dc68f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala @@ -45,7 +45,7 @@ class BinaryType private() extends AtomicType { */ override def defaultSize: Int = 100 - override def physicalDataType: PhysicalDataType = PhysicalBinaryType + private[sql] override def physicalDataType: PhysicalDataType = PhysicalBinaryType private[spark] override def asNullable: BinaryType = this } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala index d8766e95e20..ba707dc4548 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala @@ -42,7 +42,7 @@ class BooleanType private() extends AtomicType {
[spark] branch master updated: [SPARK-42876][SQL] DataType's physicalDataType should be private[sql]
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c9a530e38e7 [SPARK-42876][SQL] DataType's physicalDataType should be private[sql] c9a530e38e7 is described below commit c9a530e38e7f4ff3a491245c1d3ecaa1755c87ad Author: Rui Wang AuthorDate: Tue Mar 21 16:26:47 2023 +0800 [SPARK-42876][SQL] DataType's physicalDataType should be private[sql] ### What changes were proposed in this pull request? `physicalDataType` should not be a public API but be private[sql]. ### Why are the changes needed? This is to limit API scope to not expose unnecessary API to be public. ### Does this PR introduce _any_ user-facing change? No since we have not released Spark 3.4.0 yet. ### How was this patch tested? N/A Closes #40499 from amaliujia/change_scope_of_physical_data_type. Authored-by: Rui Wang Signed-off-by: Wenchen Fan --- sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/BinaryType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/BooleanType.scala| 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala | 2 +- .../main/scala/org/apache/spark/sql/types/CalendarIntervalType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/CharType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala | 2 +- .../main/scala/org/apache/spark/sql/types/DayTimeIntervalType.scala| 2 +- .../src/main/scala/org/apache/spark/sql/types/DecimalType.scala| 3 ++- .../src/main/scala/org/apache/spark/sql/types/DoubleType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/IntegerType.scala| 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala | 2 +- sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/StringType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/TimestampNTZType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/TimestampType.scala | 2 +- .../src/main/scala/org/apache/spark/sql/types/VarcharType.scala| 2 +- .../main/scala/org/apache/spark/sql/types/YearMonthIntervalType.scala | 2 +- 23 files changed, 24 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala index 3e5f447a762..9665385f046 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala @@ -91,7 +91,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT */ override def defaultSize: Int = 1 * elementType.defaultSize - override def physicalDataType: PhysicalDataType = + private[sql] override def physicalDataType: PhysicalDataType = PhysicalArrayType(elementType, containsNull) override def simpleString: String = s"array<${elementType.simpleString}>" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala index d2998f533de..cba437dc68f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala @@ -45,7 +45,7 @@ class BinaryType private() extends AtomicType { */ override def defaultSize: Int = 100 - override def physicalDataType: PhysicalDataType = PhysicalBinaryType + private[sql] override def physicalDataType: PhysicalDataType = PhysicalBinaryType private[spark] override def asNullable: BinaryType = this } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala index d8766e95e20..ba707dc4548 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala @@ -42,7 +42,7 @@ class BooleanType private() extends AtomicType { */ override def defaultSize: Int = 1 - override def physicalDataType: PhysicalDataType =
[spark] branch branch-3.4 updated: [SPARK-42340][CONNECT][PYTHON][3.4] Implement Grouped Map API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new ed797bbdaeb [SPARK-42340][CONNECT][PYTHON][3.4] Implement Grouped Map API ed797bbdaeb is described below commit ed797bbdaeb1e421ca1d14620f72257560896a1c Author: Xinrong Meng AuthorDate: Tue Mar 21 16:42:43 2023 +0900 [SPARK-42340][CONNECT][PYTHON][3.4] Implement Grouped Map API ### What changes were proposed in this pull request? Implement Grouped Map API:`GroupedData.applyInPandas` and `GroupedData.apply`. ### Why are the changes needed? Parity with vanilla PySpark. ### Does this PR introduce _any_ user-facing change? Yes. `GroupedData.applyInPandas` and `GroupedData.apply` are supported now, as shown below. ```sh >>> df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v")) >>> def normalize(pdf): ... v = pdf.v ... return pdf.assign(v=(v - v.mean()) / v.std()) ... >>> df.groupby("id").applyInPandas(normalize, schema="id long, v double").show() +---+---+ | id| v| +---+---+ | 1|-0.7071067811865475| | 1| 0.7071067811865475| | 2|-0.8320502943378437| | 2|-0.2773500981126146| | 2| 1.1094003924504583| +---+---+ ``` ```sh >>> pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) ... def normalize(pdf): ... v = pdf.v ... return pdf.assign(v=(v - v.mean()) / v.std()) ... >>> df.groupby("id").apply(normalize).show() /Users/xinrong.meng/spark/python/pyspark/sql/connect/group.py:228: UserWarning: It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details. warnings.warn( +---+---+ | id| v| +---+---+ | 1|-0.7071067811865475| | 1| 0.7071067811865475| | 2|-0.8320502943378437| | 2|-0.2773500981126146| | 2| 1.1094003924504583| +---+---+ ``` ### How was this patch tested? (Parity) Unit tests. Closes #40486 from xinrong-meng/group_map3.4. Authored-by: Xinrong Meng Signed-off-by: Hyukjin Kwon --- .../main/protobuf/spark/connect/relations.proto| 12 + .../sql/connect/planner/SparkConnectPlanner.scala | 14 ++ dev/sparktestsupport/modules.py| 1 + python/pyspark/sql/connect/_typing.py | 10 +- python/pyspark/sql/connect/group.py| 61 +- python/pyspark/sql/connect/plan.py | 27 +++ python/pyspark/sql/connect/proto/relations_pb2.py | 242 +++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 51 + python/pyspark/sql/pandas/group_ops.py | 6 + .../sql/tests/connect/test_connect_basic.py| 2 - .../connect/test_parity_pandas_grouped_map.py | 102 + .../sql/tests/connect/test_parity_pandas_udf.py| 5 - .../sql/tests/pandas/test_pandas_grouped_map.py| 6 +- 13 files changed, 411 insertions(+), 128 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 69451e7b76e..aba965082ea 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -63,6 +63,7 @@ message Relation { MapPartitions map_partitions = 28; CollectMetrics collect_metrics = 29; Parse parse = 30; +GroupMap group_map = 31; // NA functions NAFill fill_na = 90; @@ -788,6 +789,17 @@ message MapPartitions { CommonInlineUserDefinedFunction func = 2; } +message GroupMap { + // (Required) Input relation for Group Map API: apply, applyInPandas. + Relation input = 1; + + // (Required) Expressions for grouping keys. + repeated Expression grouping_expressions = 2; + + // (Required) Input user-defined function. + CommonInlineUserDefinedFunction func = 3; +} + // Collect arbitrary (named) metrics from a dataset. message CollectMetrics { // (Required) The input relation. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index ded5ffa78f9..41a867d3b9d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -116,6 +116,8 @@ class
[spark] branch master updated: [SPARK-40822][SQL] Stable derived column aliases
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 83a40743153 [SPARK-40822][SQL] Stable derived column aliases 83a40743153 is described below commit 83a40743153c445eca3798f7c24a362231f76022 Author: Max Gekk AuthorDate: Tue Mar 21 09:14:22 2023 +0300 [SPARK-40822][SQL] Stable derived column aliases ### What changes were proposed in this pull request? In the PR, I propose to change auto-generation of column aliases (the case when an user doesn't assign any alias explicitly). Before the changes, Spark SQL generates such alias from `Expression` but this PR proposes to take the parse tree (output of lexer), and generate an alias using the term tokens from the tree. New helper function `ParserUtils.toExprAlias` takes a `ParseTree` from `Antlr4`, and converts it to a `String` using the following simple rule: - Concatenate all terminal nodes of the lexer tree without any gaps. - Upper case keywords and numeric literals. For example, the sequence of tokens "1", "in", "(", "1.0d" ")" is converted to the alias "1IN(1.0D)". By default, the feature is off, and the SQL config `spark.sql.stableDerivedColumnAlias.enabled` allows to enable it. Closes #39332 ### Why are the changes needed? To improve user experience with Spark SQL. It is always best practice to name the result of any expressions in a queries select list, if one plans to reference them later. This yields the most readable results and stability. However, sometimes queries are generated or we’re just lazy and trust in the auto generated names. The problem is that the auto-generated names are produced by pretty printing the expression tree which is, while “generally” readable, not meant to be stable across [...] ```sql spark-sql> DESC SELECT substring('hello', 5); substring(hello, 5, 2147483647) string ``` the auto-generated column alias `substring(hello, 5, 2147483647)` contains not-obvious elements. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? By running new test: ``` $ build/sbt "test:testOnly *.ResolveAliasesSuite" ``` Closes #40126 from MaxGekk/stable-derived-column-aliases-2. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 6 ++- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++- .../spark/sql/catalyst/parser/AstBuilder.scala | 23 +++ .../spark/sql/catalyst/parser/ParserUtils.scala| 33 +-- .../org/apache/spark/sql/internal/SQLConf.scala| 9 .../catalyst/analysis/ResolveAliasesSuite.scala| 48 ++ 6 files changed, 112 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 6d0862290cf..17f7c49052a 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -81,7 +81,8 @@ LEFT_BRACKET: '['; RIGHT_BRACKET: ']'; // NOTE: If you add a new token in the list below, you should update the list of keywords -// and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`. +// and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and +// modify `ParserUtils.toExprAlias()` which assumes all keywords are between `ADD` and `ZONE`. // // Start of the keywords list @@ -426,6 +427,9 @@ DOUBLEQUOTED_STRING :'"' ( ~('"'|'\\') | ('\\' .) )* '"' ; +// NOTE: If you move a numeric literal, you should modify `ParserUtils.toExprAlias()` +// which assumes all numeric literals are between `BIGINT_LITERAL` and `BIGDECIMAL_LITERAL`. + BIGINT_LITERAL : DIGIT+ 'L' ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3a2dff78cba..8821e652a31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -478,11 +478,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case _: Attribute => true case _ => false } - def metaForAutoGeneratedAlias = { -new MetadataBuilder() - .putString("__autoGeneratedAlias", "true") - .build() - } exprs.map(_.transformUpWithPruning(_.containsPattern(UNRESOLVED_ALIAS)) { case u @