[spark] branch master updated: [SPARK-41102][CONNECT] Merge SparkConnectPlanner and SparkConnectCommandPlanner
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fd1e0d028cb [SPARK-41102][CONNECT] Merge SparkConnectPlanner and SparkConnectCommandPlanner fd1e0d028cb is described below commit fd1e0d028cb7e26921cd66a421c00d7260092b23 Author: Rui Wang AuthorDate: Fri Nov 11 15:48:21 2022 +0800 [SPARK-41102][CONNECT] Merge SparkConnectPlanner and SparkConnectCommandPlanner ### What changes were proposed in this pull request? In the past, Connect server side separates `Command` and `Relation` into two Planners. However, as we are adding new API, there are certainly cases that a `Command` still has an input which is a Relation. Thus when converting `Command`, it still needs to access the logic of converting `Relation`. View creation is an example of such cases. Usually DDL and DML of SQL will also follow. This PR refactors to merge the logic of dealing with `Command` and `Relation` into the same planner. ### Why are the changes needed? Refactoring. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing UT Closes #38604 from amaliujia/refactor-planners. Authored-by: Rui Wang Signed-off-by: Wenchen Fan --- .../command/SparkConnectCommandPlanner.scala | 174 - .../sql/connect/planner/SparkConnectPlanner.scala | 152 +- .../sql/connect/service/SparkConnectService.scala | 2 +- .../service/SparkConnectStreamHandler.scala| 9 +- .../planner/SparkConnectCommandPlannerSuite.scala | 160 --- .../connect/planner/SparkConnectPlannerSuite.scala | 25 +-- .../connect/planner/SparkConnectProtoSuite.scala | 128 ++- 7 files changed, 291 insertions(+), 359 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala deleted file mode 100644 index 11090976c7f..000 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connect.command - -import scala.collection.JavaConverters._ - -import com.google.common.collect.{Lists, Maps} - -import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction} -import org.apache.spark.connect.proto -import org.apache.spark.connect.proto.WriteOperation -import org.apache.spark.sql.{Dataset, SparkSession} -import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView} -import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, SparkConnectPlanner} -import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.command.CreateViewCommand -import org.apache.spark.sql.execution.python.UserDefinedPythonFunction -import org.apache.spark.sql.types.StringType - -final case class InvalidCommandInput( -private val message: String = "", -private val cause: Throwable = null) -extends Exception(message, cause) - -class SparkConnectCommandPlanner(session: SparkSession, command: proto.Command) { - - lazy val pythonExec = -sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) - - def process(): Unit = { -command.getCommandTypeCase match { - case proto.Command.CommandTypeCase.CREATE_FUNCTION => -handleCreateScalarFunction(command.getCreateFunction) - case proto.Command.CommandTypeCase.WRITE_OPERATION => -handleWriteOperation(command.getWriteOperation) - case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW => -handleCreateViewCommand(command.getCreateDataframeView) - case _ => throw new UnsupportedOperationException(s"$command not supported.") -} - }
[spark] branch master updated: [SPARK-41095][SQL] Convert unresolved operators to internal errors
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 38897b1ce96 [SPARK-41095][SQL] Convert unresolved operators to internal errors 38897b1ce96 is described below commit 38897b1ce96a1e24629875619c165b4fd1fa2d8f Author: Max Gekk AuthorDate: Fri Nov 11 09:37:10 2022 +0300 [SPARK-41095][SQL] Convert unresolved operators to internal errors ### What changes were proposed in this pull request? In the PR, I propose to interpret the `unresolved operator` issue as an internal error, and throw `SparkException` w/ the error class `INTERNAL_ERROR`. ### Why are the changes needed? The issues that leads to `unresolved operator` should be solved earlier w/ proper user-facing errors. If we reach the point when we cannot resolve an operator, we should interpret this as Spark SQL internal error. ### Does this PR introduce _any_ user-facing change? No, in most regular cases. ### How was this patch tested? By running the affected and modified test suites: ``` $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" $ build/sbt "test:testOnly *AnalysisErrorSuite" ``` Closes #38582 from MaxGekk/unresolved-op-internal-error. Authored-by: Max Gekk Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 - core/src/main/scala/org/apache/spark/SparkException.scala| 10 -- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 8 +--- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 12 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 2b626ba5761..63978e6be66 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -5107,11 +5107,6 @@ "Invalid expressions: []" ] }, - "_LEGACY_ERROR_TEMP_2442" : { -"message" : [ - "unresolved operator " -] - }, "_LEGACY_ERROR_TEMP_2443" : { "message" : [ "Multiple definitions of observed metrics named '': " diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 03938444e12..2f05b2ad6a7 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -68,11 +68,17 @@ class SparkException( } object SparkException { - def internalError(msg: String): SparkException = { + def internalError(msg: String, context: Array[QueryContext], summary: String): SparkException = { new SparkException( errorClass = "INTERNAL_ERROR", messageParameters = Map("message" -> msg), - cause = null) + cause = null, + context, + summary) + } + + def internalError(msg: String): SparkException = { +internalError(msg, context = Array.empty[QueryContext], summary = "") } def internalError(msg: String, cause: Throwable): SparkException = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index f88ef522f34..285c7396124 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ @@ -713,9 +714,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB extendedCheckRules.foreach(_(plan)) plan.foreachUp { case o if !o.resolved => -o.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2442", - messageParameters = Map("operator" -> o.simpleString(SQLConf.get.maxToStringFields))) +throw SparkException.internalError( + msg = s"Found the unresolved operator: ${o.simpleString(SQLConf.get.maxToStringFields)}", + context = o.origin.getQueryContext, + summary = o.origin.context.summary) case _ => } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 30116d410f7..93006224814 100644 ---
[spark] branch master updated (4a36c151ea1 -> 649c87780e5)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4a36c151ea1 [SPARK-41108][CONNECT] Control the max size of arrow batch add 649c87780e5 [SPARK-41059][SQL] Rename `_LEGACY_ERROR_TEMP_2420` to `NESTED_AGGREGATE_FUNCTION` No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json | 10 +- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 +--- .../src/test/resources/sql-tests/results/pivot.sql.out | 2 +- .../results/postgreSQL/aggregates_part3.sql.out | 2 +- .../results/udf/postgreSQL/udf-aggregates_part3.sql.out | 2 +- .../resources/sql-tests/results/udf/udf-pivot.sql.out| 2 +- 7 files changed, 19 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41108][CONNECT] Control the max size of arrow batch
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4a36c151ea1 [SPARK-41108][CONNECT] Control the max size of arrow batch 4a36c151ea1 is described below commit 4a36c151ea1cf8372ddefbe0a75f3470bfbe1587 Author: Ruifeng Zheng AuthorDate: Fri Nov 11 14:35:31 2022 +0900 [SPARK-41108][CONNECT] Control the max size of arrow batch ### What changes were proposed in this pull request? Control the max size of arrow batch ### Why are the changes needed? as per the suggestion https://github.com/apache/spark/pull/38468#discussion_r1018951362 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #38612 from zhengruifeng/connect_arrow_batchsize. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../service/SparkConnectStreamHandler.scala| 6 ++ .../sql/execution/arrow/ArrowConverters.scala | 25 -- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index 3b734616b21..9652fce5425 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.arrow.ArrowConverters class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) extends Logging { // The maximum batch size in bytes for a single batch of data to be returned via proto. - val MAX_BATCH_SIZE: Long = 10 * 1024 * 1024 + private val MAX_BATCH_SIZE: Long = 4 * 1024 * 1024 def handle(v: Request): Unit = { val session = @@ -127,8 +127,6 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { val spark = dataframe.sparkSession val schema = dataframe.schema -// TODO: control the batch size instead of max records -val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) { @@ -141,7 +139,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte val batches = rows.mapPartitionsInternal { iter => ArrowConverters -.toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId) +.toBatchWithSchemaIterator(iter, schema, MAX_BATCH_SIZE, timeZoneId) } val signal = new Object diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index a2dce31bc6d..c233ac32c12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -33,12 +33,12 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} -import org.apache.spark.util.{ByteBufferOutputStream, Utils} +import org.apache.spark.util.{ByteBufferOutputStream, SizeEstimator, Utils} /** @@ -128,10 +128,14 @@ private[sql] object ArrowConverters extends Logging { } } - private[sql] def toArrowBatchIterator( + /** + * Convert the input rows into fully contained arrow batches. + * Different from [[toBatchIterator]], each output arrow batch starts with the schema. + */ + private[sql] def toBatchWithSchemaIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int, + maxBatchSize: Long, timeZoneId: String): Iterator[(Array[Byte], Long)] = { val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) val allocator = ArrowUtils.rootAllocator.newChildAllocator( @@ -140,6 +144,7 @@ private[sql] object ArrowConverters extends
[spark] branch master updated (2a8ed47aaeb -> 73bca6e5cac)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 2a8ed47aaeb [SPARK-41036][CONNECT][PYTHON] columns` API should use `schema` API to avoid data fetching add 73bca6e5cac Revert "[SPARK-41063][BUILD] Clean all except files in Git repository before running Mima" No new revisions were added by this update. Summary of changes: dev/run-tests.py | 1 - 1 file changed, 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41036][CONNECT][PYTHON] columns` API should use `schema` API to avoid data fetching
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a8ed47aaeb [SPARK-41036][CONNECT][PYTHON] columns` API should use `schema` API to avoid data fetching 2a8ed47aaeb is described below commit 2a8ed47aaeb226dff45fe6cca38e30833bb0eae6 Author: Rui Wang AuthorDate: Fri Nov 11 11:22:03 2022 +0800 [SPARK-41036][CONNECT][PYTHON] columns` API should use `schema` API to avoid data fetching ### What changes were proposed in this pull request? Current `columns` is implemented based on `limit` which runs a job to fetch data and get schema from the data collection. However a more efficient way is to call `schema` API which only need to analyze the plan without collect data. This approach should be more efficient in most of the cases. ### Why are the changes needed? Efficiency ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38546 from amaliujia/improve_python_columns. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py| 9 ++--- python/pyspark/sql/tests/connect/test_connect_basic.py | 5 + 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 0c19c67309d..f2e528fc83c 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -140,13 +140,8 @@ class DataFrame(object): """Returns the list of columns of the current data frame.""" if self._plan is None: return [] -if "columns" not in self._cache and self._plan is not None: -pdd = self.limit(0).toPandas() -if pdd is None: -raise Exception("Empty result") -# Translate to standard pytho array -self._cache["columns"] = pdd.columns.values -return self._cache["columns"] + +return self.schema().names def count(self) -> int: """Returns the number of rows in the data frame""" diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index b2509c75dd6..3f75eb1b18f 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -106,6 +106,11 @@ class SparkConnectTests(SparkConnectSQLTestCase): # Check that the limit is applied self.assertEqual(len(data.index), 10) +def test_columns(self): +# SPARK-41036: test `columns` API for python client. +columns = self.connect.read.table(self.tbl_name).columns +self.assertEqual(["id", "name"], columns) + def test_collect(self): df = self.connect.read.table(self.tbl_name) data = df.limit(10).collect() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41105][CONNECT] Adopt `optional` keyword from proto3 which offers `hasXXX` to differentiate if a field is set or unset
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 46fab54b500 [SPARK-41105][CONNECT] Adopt `optional` keyword from proto3 which offers `hasXXX` to differentiate if a field is set or unset 46fab54b500 is described below commit 46fab54b500c579cd421fb9e8ea95fae0ddda87d Author: Rui Wang AuthorDate: Fri Nov 11 12:04:34 2022 +0900 [SPARK-41105][CONNECT] Adopt `optional` keyword from proto3 which offers `hasXXX` to differentiate if a field is set or unset ### What changes were proposed in this pull request? We used to wrap those fields into messages to acquire the ability to tell if those field is set or unset. It turns out proto3 offers built-in mechanism to achieve the same thing: https://developers.google.com/protocol-buffers/docs/proto3#specifying_field_rules. It is as easy as adding `optional` keyword to the field to auto-generate `hasXXX` method. This PR refactors existing proto to get rid of redundant message definitions. ### Why are the changes needed? Codebase simplification. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing UT Closes #38606 from amaliujia/refactor_proto. Authored-by: Rui Wang Signed-off-by: Hyukjin Kwon --- .../main/protobuf/spark/connect/relations.proto| 12 +--- .../org/apache/spark/sql/connect/dsl/package.scala | 5 +- .../sql/connect/planner/SparkConnectPlanner.scala | 4 +- python/pyspark/sql/connect/plan.py | 6 +- python/pyspark/sql/connect/proto/relations_pb2.py | 40 ++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 71 ++ .../sql/tests/connect/test_connect_plan_only.py| 4 +- 7 files changed, 61 insertions(+), 81 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index 639d1bafce5..4f30b5bfbde 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -215,11 +215,7 @@ message Sample { double lower_bound = 2; double upper_bound = 3; bool with_replacement = 4; - Seed seed = 5; - - message Seed { -int64 seed = 1; - } + optional int64 seed = 5; } // Relation of type [[Range]] that generates a sequence of integers. @@ -232,11 +228,7 @@ message Range { int64 step = 3; // Optional. Default value is assigned by 1) SQL conf "spark.sql.leafNodeDefaultParallelism" if // it is set, or 2) spark default parallelism. - NumPartitions num_partitions = 4; - - message NumPartitions { -int32 num_partitions = 1; - } + optional int32 num_partitions = 4; } // Relation alias. diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 5e7a94da347..f55ed835d23 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -216,8 +216,7 @@ package object dsl { range.setStep(1L) } if (numPartitions.isDefined) { - range.setNumPartitions( - proto.Range.NumPartitions.newBuilder().setNumPartitions(numPartitions.get)) + range.setNumPartitions(numPartitions.get) } Relation.newBuilder().setRange(range).build() } @@ -376,7 +375,7 @@ package object dsl { .setUpperBound(upperBound) .setLowerBound(lowerBound) .setWithReplacement(withReplacement) - .setSeed(Sample.Seed.newBuilder().setSeed(seed).build()) + .setSeed(seed) .build()) .build() } diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 04ce880a925..b91fef58a11 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -104,7 +104,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { rel.getLowerBound, rel.getUpperBound, rel.getWithReplacement, - if (rel.hasSeed) rel.getSeed.getSeed else Utils.random.nextLong, + if (rel.hasSeed) rel.getSeed else Utils.random.nextLong, transformRelation(rel.getInput)) } @@ -117,7 +117,7 @@ class SparkConnectPlanner(plan: proto.Relation,
[spark] branch master updated (0cb79a4af27 -> 139b62c2999)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0cb79a4af27 [SPARK-40281][PYTHON] Memory Profiler on Executors add 139b62c2999 [SPARK-41001][CONNECT] Make `userId` optional in SparkRemoteSession No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/client.py | 69 -- .../sql/tests/connect/test_connect_basic.py| 18 -- 2 files changed, 65 insertions(+), 22 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40281][PYTHON] Memory Profiler on Executors
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0cb79a4af27 [SPARK-40281][PYTHON] Memory Profiler on Executors 0cb79a4af27 is described below commit 0cb79a4af27eb226a4f7574aa1c8b5441583 Author: Xinrong Meng AuthorDate: Fri Nov 11 11:59:00 2022 +0900 [SPARK-40281][PYTHON] Memory Profiler on Executors ### What changes were proposed in this pull request? Implement PySpark memory profiling on executors. The feature is enabled via a newly-introduced Spark configuration `spark.python.profile.memory`. See more [design.](https://docs.google.com/document/d/e/2PACX-1vR2K4TdrM1eAjNDC1bsflCNRH67UWLoC-lCv6TSUVXD91Ruksm99pYTnCeIm7Ui3RgrrRNcQU_D8-oh/pub) ### Why are the changes needed? There are many factors in a PySpark program’s performance. Memory, as one of the key factors of a program’s performance, had been missing in PySpark profiling. A PySpark program on the Spark driver can be profiled with [Memory Profiler](https://www.google.com/url?q=https://pypi.org/project/memory-profiler/=D=editors=1668027860192689=AOvVaw1t4LRcObEGuhaTr5oHEUwU) as a normal Python process, but there was not an easy way to profile memory on Spark executors. PySpark UDFs, one of the most popular Python APIs, enable users to run custom code on top of the Apache Spark™ engine. However, it is difficult to optimize UDFs without understanding memory consumption. The PR proposes to introduce the PySpark memory profiler, which profiles memory on executors. It provides information about total memory usage and pinpoints which lines of code in a UDF attribute to the most memory usage. That will help optimize PySpark UDFs and reduce the likelihood of out-of-memory errors. ### Does this PR introduce _any_ user-facing change? No changes to existing user-facing behaviors. A Spark configuration `spark.python.profile.memory` is introduced to enable the PySpark memory profiling feature. ### How was this patch tested? - Unit tests. - Manual tests on Jupyter notebooks as shown below: ![image](https://user-images.githubusercontent.com/47337188/200998618-73eb5bd1-83ba-4256-9ba4-f6fb4afcd1bd.png) Closes #38584 from xinrong-meng/memory_profile. Authored-by: Xinrong Meng Signed-off-by: Hyukjin Kwon --- dev/requirements.txt | 1 + dev/sparktestsupport/modules.py | 1 + python/pyspark/context.py| 22 +- python/pyspark/profiler.py | 297 +-- python/pyspark/rdd.py| 5 +- python/pyspark/sql/udf.py| 70 +-- python/pyspark/tests/test_memory_profiler.py | 160 +++ python/pyspark/tests/test_profiler.py| 57 - 8 files changed, 570 insertions(+), 43 deletions(-) diff --git a/dev/requirements.txt b/dev/requirements.txt index 2f32066d6a8..914c26b1fa1 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -10,6 +10,7 @@ plotly mlflow>=1.0 sklearn matplotlib<3.3.0 +memory-profiler==0.60.0 # PySpark test dependencies unittest-xml-reporting diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index a439b4cbbed..159990fb33a 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -418,6 +418,7 @@ pyspark_core = Module( "pyspark.tests.test_daemon", "pyspark.tests.test_install_spark", "pyspark.tests.test_join", +"pyspark.tests.test_memory_profiler", "pyspark.tests.test_profiler", "pyspark.tests.test_rdd", "pyspark.tests.test_rddbarrier", diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 45e683efe7a..9a7a8f46e84 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -67,7 +67,7 @@ from pyspark.rdd import RDD, _load_from_socket from pyspark.taskcontext import TaskContext from pyspark.traceback_utils import CallSite, first_spark_call from pyspark.status import StatusTracker -from pyspark.profiler import ProfilerCollector, BasicProfiler, UDFBasicProfiler +from pyspark.profiler import ProfilerCollector, BasicProfiler, UDFBasicProfiler, MemoryProfiler from py4j.java_gateway import is_instance_of, JavaGateway, JavaObject, JVMView if TYPE_CHECKING: @@ -177,6 +177,7 @@ class SparkContext: jsc: Optional[JavaObject] = None, profiler_cls: Type[BasicProfiler] = BasicProfiler, udf_profiler_cls: Type[UDFBasicProfiler] = UDFBasicProfiler, +memory_profiler_cls: Type[MemoryProfiler] = MemoryProfiler, ): if conf is None or conf.get("spark.executor.allowSparkContext", "false").lower() != "true": @@ -204,6 +205,7 @@ class SparkContext:
[spark] branch master updated: [SPARK-41077][CONNECT][PYTHON][REFACTORING] Rename `ColumnRef` to `Column` in Python client implementation
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7d320d784a2 [SPARK-41077][CONNECT][PYTHON][REFACTORING] Rename `ColumnRef` to `Column` in Python client implementation 7d320d784a2 is described below commit 7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd Author: Rui Wang AuthorDate: Fri Nov 11 11:03:04 2022 +0900 [SPARK-41077][CONNECT][PYTHON][REFACTORING] Rename `ColumnRef` to `Column` in Python client implementation ### What changes were proposed in this pull request? Connect python client uses `ColumnRef` to represent columns in API (e.g. `df.name`). Current PySpark uses `Class Column` for the same thing. In this case, we can align Connect with PySpark, which can help existing PySpark users to reuse their code for Spark Connect python client as much as possible (minimize the code change). ### Why are the changes needed? This is to help existing PySpark users to reuse their code for Spark Connect python client as much as possible (minimize the code change). ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing UT Closes #38586 from amaliujia/SPARK-41077. Authored-by: Rui Wang Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/column.py | 12 python/pyspark/sql/connect/dataframe.py| 34 +++--- python/pyspark/sql/connect/function_builder.py | 4 +-- python/pyspark/sql/connect/functions.py| 6 ++-- python/pyspark/sql/connect/plan.py | 14 - python/pyspark/sql/connect/typing/__init__.pyi | 4 +-- .../connect/test_connect_column_expressions.py | 6 ++-- 7 files changed, 40 insertions(+), 40 deletions(-) diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py index 3c9f8c3d736..417bc7097de 100644 --- a/python/pyspark/sql/connect/column.py +++ b/python/pyspark/sql/connect/column.py @@ -30,8 +30,8 @@ if TYPE_CHECKING: def _bin_op( name: str, doc: str = "binary function", reverse: bool = False -) -> Callable[["ColumnRef", Any], "Expression"]: -def _(self: "ColumnRef", other: Any) -> "Expression": +) -> Callable[["Column", Any], "Expression"]: +def _(self: "Column", other: Any) -> "Expression": if isinstance(other, get_args(PrimitiveType)): other = LiteralExpression(other) if not reverse: @@ -163,15 +163,15 @@ class LiteralExpression(Expression): return f"Literal({self._value})" -class ColumnRef(Expression): +class Column(Expression): """Represents a column reference. There is no guarantee that this column actually exists. In the context of this project, we refer by its name and treat it as an unresolved attribute. Attributes that have the same fully qualified name are identical""" @classmethod -def from_qualified_name(cls, name: str) -> "ColumnRef": -return ColumnRef(name) +def from_qualified_name(cls, name: str) -> "Column": +return Column(name) def __init__(self, name: str) -> None: super().__init__() @@ -198,7 +198,7 @@ class ColumnRef(Expression): class SortOrder(Expression): -def __init__(self, col: ColumnRef, ascending: bool = True, nullsLast: bool = True) -> None: +def __init__(self, col: Column, ascending: bool = True, nullsLast: bool = True) -> None: super().__init__() self.ref = col self.ascending = ascending diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index e3116ea1250..0c19c67309d 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -31,7 +31,7 @@ import pandas import pyspark.sql.connect.plan as plan from pyspark.sql.connect.column import ( -ColumnRef, +Column, Expression, LiteralExpression, ) @@ -44,7 +44,7 @@ if TYPE_CHECKING: from pyspark.sql.connect.typing import ColumnOrString, ExpressionOrString from pyspark.sql.connect.client import RemoteSparkSession -ColumnOrName = Union[ColumnRef, str] +ColumnOrName = Union[Column, str] class GroupingFrame(object): @@ -52,9 +52,9 @@ class GroupingFrame(object): MeasuresType = Union[Sequence[Tuple["ExpressionOrString", str]], Dict[str, str]] OptMeasuresType = Optional[MeasuresType] -def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) -> None: +def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> None: self._df = df -self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x in grouping_cols] +self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in
[spark] branch master updated: [SPARK-41063][BUILD] Clean all except files in Git repository before running Mima
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 82475b1cab3 [SPARK-41063][BUILD] Clean all except files in Git repository before running Mima 82475b1cab3 is described below commit 82475b1cab37e7e3b4631e8cdb83741a101fdec1 Author: Hyukjin Kwon AuthorDate: Fri Nov 11 10:40:31 2022 +0900 [SPARK-41063][BUILD] Clean all except files in Git repository before running Mima ### What changes were proposed in this pull request? This PR proposes to clean all (except the files in Git repository) before running Mima. ### Why are the changes needed? For an unknown reason, the compilation goes into an infinite loop at Scala 2.13 build (see https://github.com/apache/spark/actions/runs/3422432297/jobs/5699849548), presumably some leftovers some SBT or Zinc (https://github.com/sbt/sbt/issues/6183). ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? CI in this PR should test it out. Closes #38599 from HyukjinKwon/SPARK-41063. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- dev/run-tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/run-tests.py b/dev/run-tests.py index fc34b4c61b0..366b814f859 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -315,6 +315,7 @@ def detect_binary_inop_with_mima(extra_profiles): "[info] Detecting binary incompatibilities with MiMa using SBT with these profiles: ", profiles, ) +run_cmd(["git", "clean", "-fxd"]) # See https://github.com/sbt/sbt/issues/6183 run_cmd([os.path.join(SPARK_HOME, "dev", "mima"), profiles]) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b5fbdeb5483 -> fed8acfcfc3)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b5fbdeb5483 [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path add fed8acfcfc3 [SPARK-41005][CONNECT][PYTHON] Arrow-based collect No new revisions were added by this update. Summary of changes: .../src/main/protobuf/spark/connect/base.proto | 7 +- .../service/SparkConnectStreamHandler.scala| 105 +++-- python/pyspark/sql/connect/client.py | 11 ++- python/pyspark/sql/connect/proto/base_pb2.py | 36 +++ python/pyspark/sql/connect/proto/base_pb2.pyi | 39 ++-- .../sql/tests/connect/test_connect_basic.py| 12 +++ .../sql/execution/arrow/ArrowConverters.scala | 86 + 7 files changed, 233 insertions(+), 63 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b5fbdeb5483 [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path b5fbdeb5483 is described below commit b5fbdeb5483fa4b3c6102a99fa84a0677e145c42 Author: Swaminathan Balachandran AuthorDate: Thu Nov 10 18:16:44 2022 -0600 [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path ### What changes were proposed in this pull request? Parsing the configuration value of the config value of spark.driver.log.dfsDir using Hadoop FS Path & extracting path from the URI should fix the problem ### Why are the changes needed? Currently when one passes the Absolute URI to configured filesystem, the code would fail while trying to copy the local log file to the filesystem directory path. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test cases Closes #38377 from swamirishi/SPARK-40901. Authored-by: Swaminathan Balachandran Signed-off-by: Mridul gmail.com> --- .../apache/spark/util/logging/DriverLogger.scala | 8 ++--- .../spark/util/logging/DriverLoggerSuite.scala | 42 +++--- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala index bb57e032563..4f56cf24a2f 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala @@ -126,13 +126,13 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging { throw new RuntimeException(s"${rootDir} does not exist." + s" Please create this dir in order to persist driver logs") } - val dfsLogFile: String = FileUtils.getFile(rootDir, appId -+ DriverLogger.DRIVER_LOG_FILE_SUFFIX).getAbsolutePath() + val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, appId ++ DriverLogger.DRIVER_LOG_FILE_SUFFIX)) try { inStream = new BufferedInputStream(new FileInputStream(localLogFile)) -outputStream = SparkHadoopUtil.createFile(fileSystem, new Path(dfsLogFile), +outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile, conf.get(DRIVER_LOG_ALLOW_EC)) -fileSystem.setPermission(new Path(dfsLogFile), LOG_FILE_PERMISSIONS) +fileSystem.setPermission(dfsLogFile, LOG_FILE_PERMISSIONS) } catch { case e: Exception => JavaUtils.closeQuietly(inStream) diff --git a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala index bd7ec242a93..9599bd29188 100644 --- a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.util.logging import java.io.File import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.{SparkContext, SparkFunSuite} @@ -65,12 +66,43 @@ class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext { assert(dfsFile.length() > 0) } + test("SPARK-40901: driver logs are persisted locally and synced to dfs when log " + +"dir is absolute URI") { +val sparkConf = new SparkConf() +sparkConf.set(DRIVER_LOG_DFS_DIR, "file://" + rootDfsDir.getAbsolutePath()) +val sc = getSparkContext(sparkConf) +val app_id = sc.applicationId +// Run a simple spark application +sc.parallelize(1 to 1000).count() + +// Assert driver log file exists +val rootDir = Utils.getLocalDir(sc.getConf) +val driverLogsDir = FileUtils.getFile(rootDir, DriverLogger.DRIVER_LOG_DIR) +assert(driverLogsDir.exists()) +val files = driverLogsDir.listFiles() +assert(files.length === 1) +assert(files(0).getName.equals(DriverLogger.DRIVER_LOG_FILE)) + +sc.stop() +assert(!driverLogsDir.exists()) +assert(sc.getConf.get(DRIVER_LOG_DFS_DIR).get.startsWith("file:///")) +val dfsFile = new Path(sc.getConf.get(DRIVER_LOG_DFS_DIR).get + + "/" + app_id + DriverLogger.DRIVER_LOG_FILE_SUFFIX) +val dfsFileStatus = dfsFile.getFileSystem(sc.hadoopConfiguration).getFileStatus(dfsFile) + +assert(dfsFileStatus.isFile) +assert(dfsFileStatus.getLen > 0) + } + private def getSparkContext(): SparkContext = { -val conf = new SparkConf() -conf.set(DRIVER_LOG_DFS_DIR, rootDfsDir.getAbsolutePath()) -conf.set(DRIVER_LOG_PERSISTTODFS, true) -
[spark] branch master updated: [SPARK-41055][SQL] Rename `_LEGACY_ERROR_TEMP_2424` to `GROUP_BY_AGGREGATE`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6aac34315de [SPARK-41055][SQL] Rename `_LEGACY_ERROR_TEMP_2424` to `GROUP_BY_AGGREGATE` 6aac34315de is described below commit 6aac34315de2ee3d48fe2e1819a02600b3b22d22 Author: itholic AuthorDate: Thu Nov 10 19:30:11 2022 +0300 [SPARK-41055][SQL] Rename `_LEGACY_ERROR_TEMP_2424` to `GROUP_BY_AGGREGATE` ### What changes were proposed in this pull request? This PR proposes to rename `_LEGACY_ERROR_TEMP_2424` to `GROUP_BY_AGGREGATE` ### Why are the changes needed? To use proper error class name. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` ./build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*” ``` Closes #38569 from itholic/SPARK-41055. Lead-authored-by: itholic Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com> Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 +- .../spark/sql/catalyst/analysis/CheckAnalysis.scala| 2 +- .../test/resources/sql-tests/results/group-by.sql.out | 2 +- .../sql-tests/results/udf/udf-group-by.sql.out | 2 +- .../org/apache/spark/sql/DataFrameAggregateSuite.scala | 11 +++ .../org/apache/spark/sql/DataFramePivotSuite.scala | 18 ++ 6 files changed, 25 insertions(+), 20 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 7c33c1059ae..dcc6effb30f 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -469,6 +469,11 @@ "Grouping sets size cannot be greater than " ] }, + "GROUP_BY_AGGREGATE" : { +"message" : [ + "Aggregate functions are not allowed in GROUP BY, but found ." +] + }, "GROUP_BY_POS_OUT_OF_RANGE" : { "message" : [ "GROUP BY position is not in select list (valid range is [1, ])." @@ -5008,11 +5013,6 @@ "Correlated scalar subquery '' is neither present in the group by, nor in an aggregate function. Add it to group by using ordinal position or wrap it in first() (or first_value) if you don't care which value you get." ] }, - "_LEGACY_ERROR_TEMP_2424" : { -"message" : [ - "aggregate functions are not allowed in GROUP BY, but found " -] - }, "_LEGACY_ERROR_TEMP_2425" : { "message" : [ "expression cannot be used as a grouping expression because its data type is not an orderable data type." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 9e41bcebe47..1ce1fcd0144 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -413,7 +413,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def checkValidGroupingExprs(expr: Expression): Unit = { if (expr.exists(_.isInstanceOf[AggregateExpression])) { expr.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2424", + errorClass = "GROUP_BY_AGGREGATE", messageParameters = Map("sqlExpr" -> expr.sql)) } diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 6ccc0c34ff0..1075a6ab887 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -213,7 +213,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_2424", + "errorClass" : "GROUP_BY_AGGREGATE", "messageParameters" : { "sqlExpr" : "count(testdata.b)" }, diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out index 4d336adc412..093cdcac25a 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out @@ -190,7 +190,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_2424", + "errorClass" : "GROUP_BY_AGGREGATE", "messageParameters" : { "sqlExpr" : "CAST(udf(cast(count(b) as string)) AS BIGINT)" }, diff --git
[spark] branch branch-3.3 updated: [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 libs
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new bea58e4b863 [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 libs bea58e4b863 is described below commit bea58e4b8634fbe3497431cb50351aa186107fb3 Author: Cheng Pan AuthorDate: Thu Nov 10 08:57:56 2022 -0600 [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 libs ### What changes were proposed in this pull request? SPARK-27610 relocated the netty x86 native libs, and the recent version netty ships arm64 native libs as well, we should do same thing to make it works on arm64 platform. ### Why are the changes needed? Align arm64 behavior w/ x86 ### Does this PR introduce _any_ user-facing change? Yes, bug fix for ARM64 platform. ### How was this patch tested? Before patch ``` ➜ apache-spark git:(SPARK-41089) ll common/network-yarn/target/exploded/META-INF/native total 752 -rw-r--r-- 1 chengpan staff 101K Oct 11 23:24 libnetty_transport_native_epoll_aarch_64.so -rw-r--r-- 1 chengpan staff94K Oct 11 17:57 libnetty_transport_native_kqueue_aarch_64.jnilib -rw-r--r-- 1 chengpan staff93K Oct 11 23:27 liborg_sparkproject_netty_transport_native_epoll_x86_64.so -rw-r--r-- 1 chengpan staff77K Oct 11 17:51 liborg_sparkproject_netty_transport_native_kqueue_x86_64.jnilib drwxr-xr-x 3 chengpan staff96B Nov 9 13:46 linux32 drwxr-xr-x 3 chengpan staff96B Nov 9 13:46 linux64 drwxr-xr-x 3 chengpan staff96B Nov 9 13:46 osx drwxr-xr-x 3 chengpan staff96B Nov 9 13:46 windows32 drwxr-xr-x 3 chengpan staff96B Nov 9 13:46 windows64 ``` After patch ``` ➜ apache-spark git:(SPARK-41089) ll common/network-yarn/target/exploded/META-INF/native total 752 -rw-r--r-- 1 chengpan staff 101K Oct 11 23:24 liborg_sparkproject_netty_transport_native_epoll_aarch_64.so -rw-r--r-- 1 chengpan staff93K Oct 11 23:27 liborg_sparkproject_netty_transport_native_epoll_x86_64.so -rw-r--r-- 1 chengpan staff94K Oct 11 17:57 liborg_sparkproject_netty_transport_native_kqueue_aarch_64.jnilib -rw-r--r-- 1 chengpan staff77K Oct 11 17:51 liborg_sparkproject_netty_transport_native_kqueue_x86_64.jnilib drwxr-xr-x 3 chengpan staff96B Nov 10 12:07 linux32 drwxr-xr-x 3 chengpan staff96B Nov 10 12:07 linux64 drwxr-xr-x 3 chengpan staff96B Nov 10 12:07 osx drwxr-xr-x 3 chengpan staff96B Nov 10 12:07 windows32 drwxr-xr-x 3 chengpan staff96B Nov 10 12:07 windows64 ``` Closes #38593 from pan3793/SPARK-41089. Authored-by: Cheng Pan Signed-off-by: Sean Owen (cherry picked from commit c72d39990182ad2207c8cdd523af06ee4dc02fc5) Signed-off-by: Sean Owen --- common/network-yarn/pom.xml | 4 1 file changed, 4 insertions(+) diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 14d41802a8b..81146a36c98 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -174,6 +174,10 @@ tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_epoll_x86_64.so" /> + + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c5d27603f29 -> c72d3999018)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c5d27603f29 [SPARK-41064][CONNECT][PYTHON] Implement `DataFrame.crosstab` and `DataFrame.stat.crosstab` add c72d3999018 [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 libs No new revisions were added by this update. Summary of changes: common/network-yarn/pom.xml | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41064][CONNECT][PYTHON] Implement `DataFrame.crosstab` and `DataFrame.stat.crosstab`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c5d27603f29 [SPARK-41064][CONNECT][PYTHON] Implement `DataFrame.crosstab` and `DataFrame.stat.crosstab` c5d27603f29 is described below commit c5d27603f29437f1686cac70727594c19410a273 Author: Ruifeng Zheng AuthorDate: Thu Nov 10 18:15:54 2022 +0800 [SPARK-41064][CONNECT][PYTHON] Implement `DataFrame.crosstab` and `DataFrame.stat.crosstab` ### What changes were proposed in this pull request? Implement `DataFrame.crosstab` and `DataFrame.stat.crosstab` ### Why are the changes needed? for api coverage ### Does this PR introduce _any_ user-facing change? yes, new api ### How was this patch tested? added ut Closes #38578 from zhengruifeng/connect_df_crosstab. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/relations.proto| 19 +++ .../org/apache/spark/sql/connect/dsl/package.scala | 17 +++ .../sql/connect/planner/SparkConnectPlanner.scala | 10 ++ .../connect/planner/SparkConnectProtoSuite.scala | 6 + python/pyspark/sql/connect/dataframe.py| 62 ++ python/pyspark/sql/connect/plan.py | 32 + python/pyspark/sql/connect/proto/relations_pb2.py | 134 +++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 50 .../sql/tests/connect/test_connect_plan_only.py| 10 ++ 9 files changed, 274 insertions(+), 66 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index b3613fc908d..639d1bafce5 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -52,6 +52,7 @@ message Relation { // stat functions StatSummary summary = 100; +StatCrosstab crosstab = 101; Unknown unknown = 999; } @@ -284,6 +285,24 @@ message StatSummary { repeated string statistics = 2; } +// Computes a pair-wise frequency table of the given columns. Also known as a contingency table. +// It will invoke 'Dataset.stat.crosstab' (same as 'StatFunctions.crossTabulate') +// to compute the results. +message StatCrosstab { + // (Required) The input relation. + Relation input = 1; + + // (Required) The name of the first column. + // + // Distinct items will make the first item of each row. + string col1 = 2; + + // (Required) The name of the second column. + // + // Distinct items will make the column names of the DataFrame. + string col2 = 3; +} + // Rename columns on the input relation by the same length of names. message RenameColumnsBySameLengthNames { // Required. The input relation. diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 381cbf7a9a8..5e7a94da347 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -227,6 +227,21 @@ package object dsl { } } +implicit class DslStatFunctions(val logicalPlan: Relation) { + def crosstab(col1: String, col2: String): Relation = { +Relation + .newBuilder() + .setCrosstab( +proto.StatCrosstab + .newBuilder() + .setInput(logicalPlan) + .setCol1(col1) + .setCol2(col2) + .build()) + .build() + } +} + implicit class DslLogicalPlan(val logicalPlan: Relation) { def select(exprs: Expression*): Relation = { Relation @@ -463,6 +478,8 @@ package object dsl { Repartition.newBuilder().setInput(logicalPlan).setNumPartitions(num).setShuffle(true)) .build() + def stat: DslStatFunctions = new DslStatFunctions(logicalPlan) + def summary(statistics: String*): Relation = { Relation .newBuilder() diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 148f5569683..04ce880a925 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -67,6 +67,8 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { transformSubqueryAlias(rel.getSubqueryAlias) case proto.Relation.RelTypeCase.REPARTITION =>
[spark] branch master updated: [SPARK-41034][CONNECT][PYTHON][FOLLOW-UP] Fix mypy annotations test
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c2a8e48e70a [SPARK-41034][CONNECT][PYTHON][FOLLOW-UP] Fix mypy annotations test c2a8e48e70a is described below commit c2a8e48e70abfb6bd101c99c5a0f6017151fc85e Author: Ruifeng Zheng AuthorDate: Thu Nov 10 18:25:50 2022 +0900 [SPARK-41034][CONNECT][PYTHON][FOLLOW-UP] Fix mypy annotations test ### What changes were proposed in this pull request? Fix python mypy ### Why are the changes needed? https://github.com/apache/spark/pull/38541 breaks the linter ``` starting mypy annotations test... annotations failed mypy checks: python/pyspark/sql/connect/plan.py:683: error: Argument 1 to "plan" of "LogicalPlan" has incompatible type "Optional[RemoteSparkSession]"; expected "RemoteSparkSession" [arg-type] python/pyspark/sql/connect/plan.py:815: error: Argument 1 to "plan" of "LogicalPlan" has incompatible type "Optional[RemoteSparkSession]"; expected "RemoteSparkSession" [arg-type] Found 2 errors in 1 file (checked 369 source files) 1 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? local test Closes #38597 from zhengruifeng/fix_py_lint_1. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/plan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index db76d84fea1..047c9f2ce0f 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -677,7 +677,7 @@ class Repartition(LogicalPlan): self._num_partitions = num_partitions self._shuffle = shuffle -def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: +def plan(self, session: "RemoteSparkSession") -> proto.Relation: rel = proto.Relation() if self._child is not None: rel.repartition.input.CopyFrom(self._child.plan(session)) @@ -809,7 +809,7 @@ class StatSummary(LogicalPlan): super().__init__(child) self.statistics = statistics -def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: +def plan(self, session: "RemoteSparkSession") -> proto.Relation: assert self._child is not None plan = proto.Relation() plan.summary.input.CopyFrom(self._child.plan(session)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41092][SQL] Do not use identifier to match interval units
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 012d99d3a3d [SPARK-41092][SQL] Do not use identifier to match interval units 012d99d3a3d is described below commit 012d99d3a3d92819d5a26cddcfd566c46380b952 Author: Wenchen Fan AuthorDate: Thu Nov 10 11:36:21 2022 +0300 [SPARK-41092][SQL] Do not use identifier to match interval units ### What changes were proposed in this pull request? The current antlr-based SQL parser is pretty fragile due to the fact that we make the antlr parser rule pretty flexible and push more parsing logic to the Scala side (`AstBuilder`). A tiny change to the antlr parser rule may break the parser unexpectedly. As an example, in https://github.com/apache/spark/pull/38404 , we added a new parser rule to extend the INSERT syntax, and it breaks interval literal. `select b + interval '1 month' from values (1, 1)` can't be parsed after https://g [...] This PR makes the interval literal parser rule stricter. Now it lists all the allowed interval units instead of matching an identifier. This fixes the issue we hit in https://github.com/apache/spark/pull/38404 . In the future, we can revisit other parser rules and try to rely on antlr more to do the parsing work. ### Why are the changes needed? fix parser issues we hit in https://github.com/apache/spark/pull/38404 ### Does this PR introduce _any_ user-facing change? The error message is changed a little bit for `SELECT INTERVAL 1 wrong_unit`. ### How was this patch tested? existing tests Closes #38583 from cloud-fan/parser. Authored-by: Wenchen Fan Signed-off-by: Max Gekk --- docs/sql-ref-ansi-compliance.md| 11 +++ .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 11 +++ .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 36 -- .../sql-tests/results/ansi/interval.sql.out| 15 +++-- .../resources/sql-tests/results/interval.sql.out | 15 +++-- 5 files changed, 66 insertions(+), 22 deletions(-) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index a59d145d551..1501e14c604 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -407,6 +407,7 @@ Below is a list of all the keywords in Spark SQL. |DATEADD|non-reserved|non-reserved|non-reserved| |DATEDIFF|non-reserved|non-reserved|non-reserved| |DAY|non-reserved|non-reserved|non-reserved| +|DAYS|non-reserved|non-reserved|non-reserved| |DAYOFYEAR|non-reserved|non-reserved|non-reserved| |DBPROPERTIES|non-reserved|non-reserved|non-reserved| |DEFAULT|non-reserved|non-reserved|non-reserved| @@ -456,6 +457,7 @@ Below is a list of all the keywords in Spark SQL. |GROUPING|non-reserved|non-reserved|reserved| |HAVING|reserved|non-reserved|reserved| |HOUR|non-reserved|non-reserved|non-reserved| +|HOURS|non-reserved|non-reserved|non-reserved| |IF|non-reserved|non-reserved|not a keyword| |IGNORE|non-reserved|non-reserved|non-reserved| |IMPORT|non-reserved|non-reserved|non-reserved| @@ -495,13 +497,19 @@ Below is a list of all the keywords in Spark SQL. |MATCHED|non-reserved|non-reserved|non-reserved| |MERGE|non-reserved|non-reserved|non-reserved| |MICROSECOND|non-reserved|non-reserved|non-reserved| +|MICROSECONDS|non-reserved|non-reserved|non-reserved| |MILLISECOND|non-reserved|non-reserved|non-reserved| +|MILLISECONDS|non-reserved|non-reserved|non-reserved| |MINUTE|non-reserved|non-reserved|non-reserved| +|MINUTES|non-reserved|non-reserved|non-reserved| |MINUS|non-reserved|strict-non-reserved|non-reserved| |MONTH|non-reserved|non-reserved|non-reserved| +|MONTHS|non-reserved|non-reserved|non-reserved| |MSCK|non-reserved|non-reserved|non-reserved| |NAMESPACE|non-reserved|non-reserved|non-reserved| |NAMESPACES|non-reserved|non-reserved|non-reserved| +|NANOSECOND|non-reserved|non-reserved|non-reserved| +|NANOSECONDS|non-reserved|non-reserved|non-reserved| |NATURAL|reserved|strict-non-reserved|reserved| |NO|non-reserved|non-reserved|reserved| |NOT|reserved|non-reserved|reserved| @@ -565,6 +573,7 @@ Below is a list of all the keywords in Spark SQL. |SCHEMA|non-reserved|non-reserved|non-reserved| |SCHEMAS|non-reserved|non-reserved|non-reserved| |SECOND|non-reserved|non-reserved|non-reserved| +|SECONDS|non-reserved|non-reserved|non-reserved| |SELECT|reserved|non-reserved|reserved| |SEMI|non-reserved|strict-non-reserved|non-reserved| |SEPARATED|non-reserved|non-reserved|non-reserved| @@ -631,10 +640,12 @@ Below is a list of all the keywords in Spark SQL. |VIEW|non-reserved|non-reserved|non-reserved| |VIEWS|non-reserved|non-reserved|non-reserved| |WEEK|non-reserved|non-reserved|non-reserved|