[spark] branch master updated: [SPARK-41102][CONNECT] Merge SparkConnectPlanner and SparkConnectCommandPlanner

2022-11-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fd1e0d028cb [SPARK-41102][CONNECT] Merge SparkConnectPlanner and 
SparkConnectCommandPlanner
fd1e0d028cb is described below

commit fd1e0d028cb7e26921cd66a421c00d7260092b23
Author: Rui Wang 
AuthorDate: Fri Nov 11 15:48:21 2022 +0800

[SPARK-41102][CONNECT] Merge SparkConnectPlanner and 
SparkConnectCommandPlanner

### What changes were proposed in this pull request?

In the past, Connect server side separates `Command` and `Relation` into 
two Planners. However, as we are adding new API, there are certainly cases that 
a `Command` still has an input which is a Relation. Thus when converting 
`Command`, it still needs to access the logic of converting `Relation`. View 
creation is an example of such cases. Usually DDL and DML of SQL will also 
follow.

This PR refactors to merge the logic of dealing with `Command` and 
`Relation` into the same planner.

### Why are the changes needed?

Refactoring.

### Does this PR introduce _any_ user-facing change?

NO
### How was this patch tested?

Existing UT

Closes #38604 from amaliujia/refactor-planners.

Authored-by: Rui Wang 
Signed-off-by: Wenchen Fan 
---
 .../command/SparkConnectCommandPlanner.scala   | 174 -
 .../sql/connect/planner/SparkConnectPlanner.scala  | 152 +-
 .../sql/connect/service/SparkConnectService.scala  |   2 +-
 .../service/SparkConnectStreamHandler.scala|   9 +-
 .../planner/SparkConnectCommandPlannerSuite.scala  | 160 ---
 .../connect/planner/SparkConnectPlannerSuite.scala |  25 +--
 .../connect/planner/SparkConnectProtoSuite.scala   | 128 ++-
 7 files changed, 291 insertions(+), 359 deletions(-)

diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
deleted file mode 100644
index 11090976c7f..000
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/command/SparkConnectCommandPlanner.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connect.command
-
-import scala.collection.JavaConverters._
-
-import com.google.common.collect.{Lists, Maps}
-
-import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
-import org.apache.spark.connect.proto
-import org.apache.spark.connect.proto.WriteOperation
-import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView}
-import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, 
SparkConnectPlanner}
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.command.CreateViewCommand
-import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
-import org.apache.spark.sql.types.StringType
-
-final case class InvalidCommandInput(
-private val message: String = "",
-private val cause: Throwable = null)
-extends Exception(message, cause)
-
-class SparkConnectCommandPlanner(session: SparkSession, command: 
proto.Command) {
-
-  lazy val pythonExec =
-sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
-
-  def process(): Unit = {
-command.getCommandTypeCase match {
-  case proto.Command.CommandTypeCase.CREATE_FUNCTION =>
-handleCreateScalarFunction(command.getCreateFunction)
-  case proto.Command.CommandTypeCase.WRITE_OPERATION =>
-handleWriteOperation(command.getWriteOperation)
-  case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW =>
-handleCreateViewCommand(command.getCreateDataframeView)
-  case _ => throw new UnsupportedOperationException(s"$command not 
supported.")
-}
-  }

[spark] branch master updated: [SPARK-41095][SQL] Convert unresolved operators to internal errors

2022-11-10 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 38897b1ce96 [SPARK-41095][SQL] Convert unresolved operators to 
internal errors
38897b1ce96 is described below

commit 38897b1ce96a1e24629875619c165b4fd1fa2d8f
Author: Max Gekk 
AuthorDate: Fri Nov 11 09:37:10 2022 +0300

[SPARK-41095][SQL] Convert unresolved operators to internal errors

### What changes were proposed in this pull request?
In the PR, I propose to interpret the `unresolved operator` issue as an 
internal error, and throw `SparkException` w/ the error class `INTERNAL_ERROR`.

### Why are the changes needed?
The issues that leads to `unresolved operator` should be solved earlier w/ 
proper user-facing errors. If we reach the point when we cannot resolve an 
operator, we should interpret this as Spark SQL internal error.

### Does this PR introduce _any_ user-facing change?
No, in most regular cases.

### How was this patch tested?
By running the affected and modified test suites:
```
$ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly 
org.apache.spark.sql.SQLQueryTestSuite"
$ build/sbt "test:testOnly *AnalysisErrorSuite"
```

Closes #38582 from MaxGekk/unresolved-op-internal-error.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json |  5 -
 core/src/main/scala/org/apache/spark/SparkException.scala| 10 --
 .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala   |  8 +---
 .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 12 
 4 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 2b626ba5761..63978e6be66 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -5107,11 +5107,6 @@
   "Invalid expressions: []"
 ]
   },
-  "_LEGACY_ERROR_TEMP_2442" : {
-"message" : [
-  "unresolved operator "
-]
-  },
   "_LEGACY_ERROR_TEMP_2443" : {
 "message" : [
   "Multiple definitions of observed metrics named '': "
diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala 
b/core/src/main/scala/org/apache/spark/SparkException.scala
index 03938444e12..2f05b2ad6a7 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -68,11 +68,17 @@ class SparkException(
 }
 
 object SparkException {
-  def internalError(msg: String): SparkException = {
+  def internalError(msg: String, context: Array[QueryContext], summary: 
String): SparkException = {
 new SparkException(
   errorClass = "INTERNAL_ERROR",
   messageParameters = Map("message" -> msg),
-  cause = null)
+  cause = null,
+  context,
+  summary)
+  }
+
+  def internalError(msg: String): SparkException = {
+internalError(msg, context = Array.empty[QueryContext], summary = "")
   }
 
   def internalError(msg: String, cause: Throwable): SparkException = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index f88ef522f34..285c7396124 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
@@ -713,9 +714,10 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 extendedCheckRules.foreach(_(plan))
 plan.foreachUp {
   case o if !o.resolved =>
-o.failAnalysis(
-  errorClass = "_LEGACY_ERROR_TEMP_2442",
-  messageParameters = Map("operator" -> 
o.simpleString(SQLConf.get.maxToStringFields)))
+throw SparkException.internalError(
+  msg = s"Found the unresolved operator: 
${o.simpleString(SQLConf.get.maxToStringFields)}",
+  context = o.origin.getQueryContext,
+  summary = o.origin.context.summary)
   case _ =>
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 30116d410f7..93006224814 100644
--- 

[spark] branch master updated (4a36c151ea1 -> 649c87780e5)

2022-11-10 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4a36c151ea1 [SPARK-41108][CONNECT] Control the max size of arrow batch
 add 649c87780e5 [SPARK-41059][SQL] Rename `_LEGACY_ERROR_TEMP_2420` to 
`NESTED_AGGREGATE_FUNCTION`

No new revisions were added by this update.

Summary of changes:
 core/src/main/resources/error/error-classes.json | 10 +-
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala  |  2 +-
 .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 +---
 .../src/test/resources/sql-tests/results/pivot.sql.out   |  2 +-
 .../results/postgreSQL/aggregates_part3.sql.out  |  2 +-
 .../results/udf/postgreSQL/udf-aggregates_part3.sql.out  |  2 +-
 .../resources/sql-tests/results/udf/udf-pivot.sql.out|  2 +-
 7 files changed, 19 insertions(+), 17 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41108][CONNECT] Control the max size of arrow batch

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4a36c151ea1 [SPARK-41108][CONNECT] Control the max size of arrow batch
4a36c151ea1 is described below

commit 4a36c151ea1cf8372ddefbe0a75f3470bfbe1587
Author: Ruifeng Zheng 
AuthorDate: Fri Nov 11 14:35:31 2022 +0900

[SPARK-41108][CONNECT] Control the max size of arrow batch

### What changes were proposed in this pull request?

Control the max size of arrow batch

### Why are the changes needed?

as per the suggestion 
https://github.com/apache/spark/pull/38468#discussion_r1018951362

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing tests

Closes #38612 from zhengruifeng/connect_arrow_batchsize.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../service/SparkConnectStreamHandler.scala|  6 ++
 .../sql/execution/arrow/ArrowConverters.scala  | 25 --
 2 files changed, 20 insertions(+), 11 deletions(-)

diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 3b734616b21..9652fce5425 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.arrow.ArrowConverters
 class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) 
extends Logging {
 
   // The maximum batch size in bytes for a single batch of data to be returned 
via proto.
-  val MAX_BATCH_SIZE: Long = 10 * 1024 * 1024
+  private val MAX_BATCH_SIZE: Long = 4 * 1024 * 1024
 
   def handle(v: Request): Unit = {
 val session =
@@ -127,8 +127,6 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = {
 val spark = dataframe.sparkSession
 val schema = dataframe.schema
-// TODO: control the batch size instead of max records
-val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
 val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
 
 SQLExecution.withNewExecutionId(dataframe.queryExecution, 
Some("collectArrow")) {
@@ -141,7 +139,7 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
 
 val batches = rows.mapPartitionsInternal { iter =>
   ArrowConverters
-.toArrowBatchIterator(iter, schema, maxRecordsPerBatch, timeZoneId)
+.toBatchWithSchemaIterator(iter, schema, MAX_BATCH_SIZE, 
timeZoneId)
 }
 
 val signal = new Object
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index a2dce31bc6d..c233ac32c12 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -33,12 +33,12 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ArrowUtils
 import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, 
ColumnVector}
-import org.apache.spark.util.{ByteBufferOutputStream, Utils}
+import org.apache.spark.util.{ByteBufferOutputStream, SizeEstimator, Utils}
 
 
 /**
@@ -128,10 +128,14 @@ private[sql] object ArrowConverters extends Logging {
 }
   }
 
-  private[sql] def toArrowBatchIterator(
+  /**
+   * Convert the input rows into fully contained arrow batches.
+   * Different from [[toBatchIterator]], each output arrow batch starts with 
the schema.
+   */
+  private[sql] def toBatchWithSchemaIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int,
+  maxBatchSize: Long,
   timeZoneId: String): Iterator[(Array[Byte], Long)] = {
 val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
 val allocator = ArrowUtils.rootAllocator.newChildAllocator(
@@ -140,6 +144,7 @@ private[sql] object ArrowConverters extends 

[spark] branch master updated (2a8ed47aaeb -> 73bca6e5cac)

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 2a8ed47aaeb [SPARK-41036][CONNECT][PYTHON] columns` API should use 
`schema` API to avoid data fetching
 add 73bca6e5cac Revert "[SPARK-41063][BUILD] Clean all except files in Git 
repository before running Mima"

No new revisions were added by this update.

Summary of changes:
 dev/run-tests.py | 1 -
 1 file changed, 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41036][CONNECT][PYTHON] columns` API should use `schema` API to avoid data fetching

2022-11-10 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2a8ed47aaeb [SPARK-41036][CONNECT][PYTHON] columns` API should use 
`schema` API to avoid data fetching
2a8ed47aaeb is described below

commit 2a8ed47aaeb226dff45fe6cca38e30833bb0eae6
Author: Rui Wang 
AuthorDate: Fri Nov 11 11:22:03 2022 +0800

[SPARK-41036][CONNECT][PYTHON] columns` API should use `schema` API to 
avoid data fetching

### What changes were proposed in this pull request?

Current `columns` is implemented based on `limit` which runs a job to fetch 
data and get schema from the data collection. However a more efficient way is 
to call `schema` API which only need to analyze the plan without collect data. 
This approach should be more efficient in most of the cases.

### Why are the changes needed?

Efficiency

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38546 from amaliujia/improve_python_columns.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py| 9 ++---
 python/pyspark/sql/tests/connect/test_connect_basic.py | 5 +
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 0c19c67309d..f2e528fc83c 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -140,13 +140,8 @@ class DataFrame(object):
 """Returns the list of columns of the current data frame."""
 if self._plan is None:
 return []
-if "columns" not in self._cache and self._plan is not None:
-pdd = self.limit(0).toPandas()
-if pdd is None:
-raise Exception("Empty result")
-# Translate to standard pytho array
-self._cache["columns"] = pdd.columns.values
-return self._cache["columns"]
+
+return self.schema().names
 
 def count(self) -> int:
 """Returns the number of rows in the data frame"""
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index b2509c75dd6..3f75eb1b18f 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -106,6 +106,11 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 # Check that the limit is applied
 self.assertEqual(len(data.index), 10)
 
+def test_columns(self):
+# SPARK-41036: test `columns` API for python client.
+columns = self.connect.read.table(self.tbl_name).columns
+self.assertEqual(["id", "name"], columns)
+
 def test_collect(self):
 df = self.connect.read.table(self.tbl_name)
 data = df.limit(10).collect()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41105][CONNECT] Adopt `optional` keyword from proto3 which offers `hasXXX` to differentiate if a field is set or unset

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 46fab54b500 [SPARK-41105][CONNECT] Adopt `optional` keyword from 
proto3 which offers `hasXXX` to differentiate if a field is set or unset
46fab54b500 is described below

commit 46fab54b500c579cd421fb9e8ea95fae0ddda87d
Author: Rui Wang 
AuthorDate: Fri Nov 11 12:04:34 2022 +0900

[SPARK-41105][CONNECT] Adopt `optional` keyword from proto3 which offers 
`hasXXX` to differentiate if a field is set or unset

### What changes were proposed in this pull request?

We used to wrap those fields into messages to acquire the ability to tell 
if those field is set or unset. It turns out proto3 offers built-in mechanism 
to achieve the same thing: 
https://developers.google.com/protocol-buffers/docs/proto3#specifying_field_rules.

It is as easy as adding `optional` keyword to the field to auto-generate 
`hasXXX` method.

This PR refactors existing proto to get rid of redundant message 
definitions.

### Why are the changes needed?

Codebase simplification.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

Existing UT

Closes #38606 from amaliujia/refactor_proto.

Authored-by: Rui Wang 
Signed-off-by: Hyukjin Kwon 
---
 .../main/protobuf/spark/connect/relations.proto| 12 +---
 .../org/apache/spark/sql/connect/dsl/package.scala |  5 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  4 +-
 python/pyspark/sql/connect/plan.py |  6 +-
 python/pyspark/sql/connect/proto/relations_pb2.py  | 40 ++--
 python/pyspark/sql/connect/proto/relations_pb2.pyi | 71 ++
 .../sql/tests/connect/test_connect_plan_only.py|  4 +-
 7 files changed, 61 insertions(+), 81 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index 639d1bafce5..4f30b5bfbde 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -215,11 +215,7 @@ message Sample {
   double lower_bound = 2;
   double upper_bound = 3;
   bool with_replacement = 4;
-  Seed seed = 5;
-
-  message Seed {
-int64 seed = 1;
-  }
+  optional int64 seed = 5;
 }
 
 // Relation of type [[Range]] that generates a sequence of integers.
@@ -232,11 +228,7 @@ message Range {
   int64 step = 3;
   // Optional. Default value is assigned by 1) SQL conf 
"spark.sql.leafNodeDefaultParallelism" if
   // it is set, or 2) spark default parallelism.
-  NumPartitions num_partitions = 4;
-
-  message NumPartitions {
-int32 num_partitions = 1;
-  }
+  optional int32 num_partitions = 4;
 }
 
 // Relation alias.
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 5e7a94da347..f55ed835d23 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -216,8 +216,7 @@ package object dsl {
   range.setStep(1L)
 }
 if (numPartitions.isDefined) {
-  range.setNumPartitions(
-
proto.Range.NumPartitions.newBuilder().setNumPartitions(numPartitions.get))
+  range.setNumPartitions(numPartitions.get)
 }
 Relation.newBuilder().setRange(range).build()
   }
@@ -376,7 +375,7 @@ package object dsl {
   .setUpperBound(upperBound)
   .setLowerBound(lowerBound)
   .setWithReplacement(withReplacement)
-  .setSeed(Sample.Seed.newBuilder().setSeed(seed).build())
+  .setSeed(seed)
   .build())
   .build()
   }
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 04ce880a925..b91fef58a11 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -104,7 +104,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: 
SparkSession) {
   rel.getLowerBound,
   rel.getUpperBound,
   rel.getWithReplacement,
-  if (rel.hasSeed) rel.getSeed.getSeed else Utils.random.nextLong,
+  if (rel.hasSeed) rel.getSeed else Utils.random.nextLong,
   transformRelation(rel.getInput))
   }
 
@@ -117,7 +117,7 @@ class SparkConnectPlanner(plan: proto.Relation, 

[spark] branch master updated (0cb79a4af27 -> 139b62c2999)

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 0cb79a4af27 [SPARK-40281][PYTHON] Memory Profiler on Executors
 add 139b62c2999 [SPARK-41001][CONNECT] Make `userId` optional in 
SparkRemoteSession

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/client.py   | 69 --
 .../sql/tests/connect/test_connect_basic.py| 18 --
 2 files changed, 65 insertions(+), 22 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40281][PYTHON] Memory Profiler on Executors

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0cb79a4af27 [SPARK-40281][PYTHON] Memory Profiler on Executors
0cb79a4af27 is described below

commit 0cb79a4af27eb226a4f7574aa1c8b5441583
Author: Xinrong Meng 
AuthorDate: Fri Nov 11 11:59:00 2022 +0900

[SPARK-40281][PYTHON] Memory Profiler on Executors

### What changes were proposed in this pull request?
Implement PySpark memory profiling on executors. The feature is enabled via 
a newly-introduced Spark configuration `spark.python.profile.memory`.

See more 
[design.](https://docs.google.com/document/d/e/2PACX-1vR2K4TdrM1eAjNDC1bsflCNRH67UWLoC-lCv6TSUVXD91Ruksm99pYTnCeIm7Ui3RgrrRNcQU_D8-oh/pub)

### Why are the changes needed?
There are many factors in a PySpark program’s performance. Memory, as one 
of the key factors of a program’s performance, had been missing in PySpark 
profiling. A PySpark program on the Spark driver can be profiled with [Memory 
Profiler](https://www.google.com/url?q=https://pypi.org/project/memory-profiler/=D=editors=1668027860192689=AOvVaw1t4LRcObEGuhaTr5oHEUwU)
 as a normal Python process, but there was not an easy way to profile memory on 
Spark executors.

PySpark UDFs, one of the most popular Python APIs, enable users to run 
custom code on top of the Apache Spark™ engine. However, it is difficult to 
optimize UDFs without understanding memory consumption.

The PR proposes to introduce the PySpark memory profiler, which profiles 
memory on executors. It provides information about total memory usage and 
pinpoints which lines of code in a UDF attribute to the most memory usage. That 
will help optimize PySpark UDFs and reduce the likelihood of out-of-memory 
errors.

### Does this PR introduce _any_ user-facing change?
No changes to existing user-facing behaviors.

A Spark configuration `spark.python.profile.memory` is introduced to enable 
the PySpark memory profiling feature.

### How was this patch tested?
- Unit tests.
- Manual tests on Jupyter notebooks as shown below:

![image](https://user-images.githubusercontent.com/47337188/200998618-73eb5bd1-83ba-4256-9ba4-f6fb4afcd1bd.png)

Closes #38584 from xinrong-meng/memory_profile.

Authored-by: Xinrong Meng 
Signed-off-by: Hyukjin Kwon 
---
 dev/requirements.txt |   1 +
 dev/sparktestsupport/modules.py  |   1 +
 python/pyspark/context.py|  22 +-
 python/pyspark/profiler.py   | 297 +--
 python/pyspark/rdd.py|   5 +-
 python/pyspark/sql/udf.py|  70 +--
 python/pyspark/tests/test_memory_profiler.py | 160 +++
 python/pyspark/tests/test_profiler.py|  57 -
 8 files changed, 570 insertions(+), 43 deletions(-)

diff --git a/dev/requirements.txt b/dev/requirements.txt
index 2f32066d6a8..914c26b1fa1 100644
--- a/dev/requirements.txt
+++ b/dev/requirements.txt
@@ -10,6 +10,7 @@ plotly
 mlflow>=1.0
 sklearn
 matplotlib<3.3.0
+memory-profiler==0.60.0
 
 # PySpark test dependencies
 unittest-xml-reporting
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index a439b4cbbed..159990fb33a 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -418,6 +418,7 @@ pyspark_core = Module(
 "pyspark.tests.test_daemon",
 "pyspark.tests.test_install_spark",
 "pyspark.tests.test_join",
+"pyspark.tests.test_memory_profiler",
 "pyspark.tests.test_profiler",
 "pyspark.tests.test_rdd",
 "pyspark.tests.test_rddbarrier",
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 45e683efe7a..9a7a8f46e84 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -67,7 +67,7 @@ from pyspark.rdd import RDD, _load_from_socket
 from pyspark.taskcontext import TaskContext
 from pyspark.traceback_utils import CallSite, first_spark_call
 from pyspark.status import StatusTracker
-from pyspark.profiler import ProfilerCollector, BasicProfiler, UDFBasicProfiler
+from pyspark.profiler import ProfilerCollector, BasicProfiler, 
UDFBasicProfiler, MemoryProfiler
 from py4j.java_gateway import is_instance_of, JavaGateway, JavaObject, JVMView
 
 if TYPE_CHECKING:
@@ -177,6 +177,7 @@ class SparkContext:
 jsc: Optional[JavaObject] = None,
 profiler_cls: Type[BasicProfiler] = BasicProfiler,
 udf_profiler_cls: Type[UDFBasicProfiler] = UDFBasicProfiler,
+memory_profiler_cls: Type[MemoryProfiler] = MemoryProfiler,
 ):
 
 if conf is None or conf.get("spark.executor.allowSparkContext", 
"false").lower() != "true":
@@ -204,6 +205,7 @@ class SparkContext:

[spark] branch master updated: [SPARK-41077][CONNECT][PYTHON][REFACTORING] Rename `ColumnRef` to `Column` in Python client implementation

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7d320d784a2 [SPARK-41077][CONNECT][PYTHON][REFACTORING] Rename 
`ColumnRef` to `Column` in Python client implementation
7d320d784a2 is described below

commit 7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd
Author: Rui Wang 
AuthorDate: Fri Nov 11 11:03:04 2022 +0900

[SPARK-41077][CONNECT][PYTHON][REFACTORING] Rename `ColumnRef` to `Column` 
in Python client implementation

### What changes were proposed in this pull request?

Connect python client uses `ColumnRef` to represent columns in API (e.g. 
`df.name`). Current PySpark uses `Class Column` for the same thing. In this 
case, we can align Connect with PySpark, which can help existing PySpark users 
to reuse their code for Spark Connect python client as much as possible 
(minimize the code change).

### Why are the changes needed?

This is to help existing PySpark users to reuse their code for Spark 
Connect python client as much as possible (minimize the code change).

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

Existing UT

Closes #38586 from amaliujia/SPARK-41077.

Authored-by: Rui Wang 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/column.py   | 12 
 python/pyspark/sql/connect/dataframe.py| 34 +++---
 python/pyspark/sql/connect/function_builder.py |  4 +--
 python/pyspark/sql/connect/functions.py|  6 ++--
 python/pyspark/sql/connect/plan.py | 14 -
 python/pyspark/sql/connect/typing/__init__.pyi |  4 +--
 .../connect/test_connect_column_expressions.py |  6 ++--
 7 files changed, 40 insertions(+), 40 deletions(-)

diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 3c9f8c3d736..417bc7097de 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -30,8 +30,8 @@ if TYPE_CHECKING:
 
 def _bin_op(
 name: str, doc: str = "binary function", reverse: bool = False
-) -> Callable[["ColumnRef", Any], "Expression"]:
-def _(self: "ColumnRef", other: Any) -> "Expression":
+) -> Callable[["Column", Any], "Expression"]:
+def _(self: "Column", other: Any) -> "Expression":
 if isinstance(other, get_args(PrimitiveType)):
 other = LiteralExpression(other)
 if not reverse:
@@ -163,15 +163,15 @@ class LiteralExpression(Expression):
 return f"Literal({self._value})"
 
 
-class ColumnRef(Expression):
+class Column(Expression):
 """Represents a column reference. There is no guarantee that this column
 actually exists. In the context of this project, we refer by its name and
 treat it as an unresolved attribute. Attributes that have the same fully
 qualified name are identical"""
 
 @classmethod
-def from_qualified_name(cls, name: str) -> "ColumnRef":
-return ColumnRef(name)
+def from_qualified_name(cls, name: str) -> "Column":
+return Column(name)
 
 def __init__(self, name: str) -> None:
 super().__init__()
@@ -198,7 +198,7 @@ class ColumnRef(Expression):
 
 
 class SortOrder(Expression):
-def __init__(self, col: ColumnRef, ascending: bool = True, nullsLast: bool 
= True) -> None:
+def __init__(self, col: Column, ascending: bool = True, nullsLast: bool = 
True) -> None:
 super().__init__()
 self.ref = col
 self.ascending = ascending
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index e3116ea1250..0c19c67309d 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -31,7 +31,7 @@ import pandas
 
 import pyspark.sql.connect.plan as plan
 from pyspark.sql.connect.column import (
-ColumnRef,
+Column,
 Expression,
 LiteralExpression,
 )
@@ -44,7 +44,7 @@ if TYPE_CHECKING:
 from pyspark.sql.connect.typing import ColumnOrString, ExpressionOrString
 from pyspark.sql.connect.client import RemoteSparkSession
 
-ColumnOrName = Union[ColumnRef, str]
+ColumnOrName = Union[Column, str]
 
 
 class GroupingFrame(object):
@@ -52,9 +52,9 @@ class GroupingFrame(object):
 MeasuresType = Union[Sequence[Tuple["ExpressionOrString", str]], Dict[str, 
str]]
 OptMeasuresType = Optional[MeasuresType]
 
-def __init__(self, df: "DataFrame", *grouping_cols: Union[ColumnRef, str]) 
-> None:
+def __init__(self, df: "DataFrame", *grouping_cols: Union[Column, str]) -> 
None:
 self._df = df
-self._grouping_cols = [x if isinstance(x, ColumnRef) else df[x] for x 
in grouping_cols]
+self._grouping_cols = [x if isinstance(x, Column) else df[x] for x in 

[spark] branch master updated: [SPARK-41063][BUILD] Clean all except files in Git repository before running Mima

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 82475b1cab3 [SPARK-41063][BUILD] Clean all except files in Git 
repository before running Mima
82475b1cab3 is described below

commit 82475b1cab37e7e3b4631e8cdb83741a101fdec1
Author: Hyukjin Kwon 
AuthorDate: Fri Nov 11 10:40:31 2022 +0900

[SPARK-41063][BUILD] Clean all except files in Git repository before 
running Mima

### What changes were proposed in this pull request?

This PR proposes to clean all (except the files in Git repository) before 
running Mima.

### Why are the changes needed?

For an unknown reason, the compilation goes into an infinite loop at Scala 
2.13 build (see 
https://github.com/apache/spark/actions/runs/3422432297/jobs/5699849548), 
presumably some leftovers some SBT or Zinc 
(https://github.com/sbt/sbt/issues/6183).

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

CI in this PR should test it out.

Closes #38599 from HyukjinKwon/SPARK-41063.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 dev/run-tests.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/dev/run-tests.py b/dev/run-tests.py
index fc34b4c61b0..366b814f859 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -315,6 +315,7 @@ def detect_binary_inop_with_mima(extra_profiles):
 "[info] Detecting binary incompatibilities with MiMa using SBT with 
these profiles: ",
 profiles,
 )
+run_cmd(["git", "clean", "-fxd"])  # See 
https://github.com/sbt/sbt/issues/6183
 run_cmd([os.path.join(SPARK_HOME, "dev", "mima"), profiles])
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b5fbdeb5483 -> fed8acfcfc3)

2022-11-10 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b5fbdeb5483 [SPARK-40901][CORE] Unable to store Spark Driver logs with 
Absolute Hadoop based URI FS Path
 add fed8acfcfc3 [SPARK-41005][CONNECT][PYTHON] Arrow-based collect

No new revisions were added by this update.

Summary of changes:
 .../src/main/protobuf/spark/connect/base.proto |   7 +-
 .../service/SparkConnectStreamHandler.scala| 105 +++--
 python/pyspark/sql/connect/client.py   |  11 ++-
 python/pyspark/sql/connect/proto/base_pb2.py   |  36 +++
 python/pyspark/sql/connect/proto/base_pb2.pyi  |  39 ++--
 .../sql/tests/connect/test_connect_basic.py|  12 +++
 .../sql/execution/arrow/ArrowConverters.scala  |  86 +
 7 files changed, 233 insertions(+), 63 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path

2022-11-10 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b5fbdeb5483 [SPARK-40901][CORE] Unable to store Spark Driver logs with 
Absolute Hadoop based URI FS Path
b5fbdeb5483 is described below

commit b5fbdeb5483fa4b3c6102a99fa84a0677e145c42
Author: Swaminathan Balachandran 
AuthorDate: Thu Nov 10 18:16:44 2022 -0600

[SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop 
based URI FS Path

### What changes were proposed in this pull request?
Parsing the configuration value of the config value of 
spark.driver.log.dfsDir using Hadoop FS Path & extracting path from the URI 
should fix the problem

### Why are the changes needed?
Currently when one passes the Absolute URI to configured filesystem, the 
code would fail while trying to copy the local log file to the filesystem 
directory path.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test cases

Closes #38377 from swamirishi/SPARK-40901.

Authored-by: Swaminathan Balachandran 
Signed-off-by: Mridul gmail.com>
---
 .../apache/spark/util/logging/DriverLogger.scala   |  8 ++---
 .../spark/util/logging/DriverLoggerSuite.scala | 42 +++---
 2 files changed, 41 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala 
b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
index bb57e032563..4f56cf24a2f 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
@@ -126,13 +126,13 @@ private[spark] class DriverLogger(conf: SparkConf) 
extends Logging {
 throw new RuntimeException(s"${rootDir} does not exist." +
   s" Please create this dir in order to persist driver logs")
   }
-  val dfsLogFile: String = FileUtils.getFile(rootDir, appId
-+ DriverLogger.DRIVER_LOG_FILE_SUFFIX).getAbsolutePath()
+  val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, appId
++ DriverLogger.DRIVER_LOG_FILE_SUFFIX))
   try {
 inStream = new BufferedInputStream(new FileInputStream(localLogFile))
-outputStream = SparkHadoopUtil.createFile(fileSystem, new 
Path(dfsLogFile),
+outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile,
   conf.get(DRIVER_LOG_ALLOW_EC))
-fileSystem.setPermission(new Path(dfsLogFile), LOG_FILE_PERMISSIONS)
+fileSystem.setPermission(dfsLogFile, LOG_FILE_PERMISSIONS)
   } catch {
 case e: Exception =>
   JavaUtils.closeQuietly(inStream)
diff --git 
a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala
index bd7ec242a93..9599bd29188 100644
--- a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util.logging
 import java.io.File
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.{SparkContext, SparkFunSuite}
@@ -65,12 +66,43 @@ class DriverLoggerSuite extends SparkFunSuite with 
LocalSparkContext {
 assert(dfsFile.length() > 0)
   }
 
+  test("SPARK-40901: driver logs are persisted locally and synced to dfs when 
log " +
+"dir is absolute URI") {
+val sparkConf = new SparkConf()
+sparkConf.set(DRIVER_LOG_DFS_DIR, "file://" + rootDfsDir.getAbsolutePath())
+val sc = getSparkContext(sparkConf)
+val app_id = sc.applicationId
+// Run a simple spark application
+sc.parallelize(1 to 1000).count()
+
+// Assert driver log file exists
+val rootDir = Utils.getLocalDir(sc.getConf)
+val driverLogsDir = FileUtils.getFile(rootDir, DriverLogger.DRIVER_LOG_DIR)
+assert(driverLogsDir.exists())
+val files = driverLogsDir.listFiles()
+assert(files.length === 1)
+assert(files(0).getName.equals(DriverLogger.DRIVER_LOG_FILE))
+
+sc.stop()
+assert(!driverLogsDir.exists())
+assert(sc.getConf.get(DRIVER_LOG_DFS_DIR).get.startsWith("file:///"))
+val dfsFile = new Path(sc.getConf.get(DRIVER_LOG_DFS_DIR).get +
+  "/" + app_id + DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+val dfsFileStatus = 
dfsFile.getFileSystem(sc.hadoopConfiguration).getFileStatus(dfsFile)
+
+assert(dfsFileStatus.isFile)
+assert(dfsFileStatus.getLen > 0)
+  }
+
   private def getSparkContext(): SparkContext = {
-val conf = new SparkConf()
-conf.set(DRIVER_LOG_DFS_DIR, rootDfsDir.getAbsolutePath())
-conf.set(DRIVER_LOG_PERSISTTODFS, true)
-

[spark] branch master updated: [SPARK-41055][SQL] Rename `_LEGACY_ERROR_TEMP_2424` to `GROUP_BY_AGGREGATE`

2022-11-10 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6aac34315de [SPARK-41055][SQL] Rename `_LEGACY_ERROR_TEMP_2424` to 
`GROUP_BY_AGGREGATE`
6aac34315de is described below

commit 6aac34315de2ee3d48fe2e1819a02600b3b22d22
Author: itholic 
AuthorDate: Thu Nov 10 19:30:11 2022 +0300

[SPARK-41055][SQL] Rename `_LEGACY_ERROR_TEMP_2424` to `GROUP_BY_AGGREGATE`

### What changes were proposed in this pull request?

This PR proposes to rename `_LEGACY_ERROR_TEMP_2424` to `GROUP_BY_AGGREGATE`

### Why are the changes needed?

To use proper error class name.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

```
./build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*”
```

Closes #38569 from itholic/SPARK-41055.

Lead-authored-by: itholic 
Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com>
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 10 +-
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala|  2 +-
 .../test/resources/sql-tests/results/group-by.sql.out  |  2 +-
 .../sql-tests/results/udf/udf-group-by.sql.out |  2 +-
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala | 11 +++
 .../org/apache/spark/sql/DataFramePivotSuite.scala | 18 ++
 6 files changed, 25 insertions(+), 20 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 7c33c1059ae..dcc6effb30f 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -469,6 +469,11 @@
   "Grouping sets size cannot be greater than "
 ]
   },
+  "GROUP_BY_AGGREGATE" : {
+"message" : [
+  "Aggregate functions are not allowed in GROUP BY, but found ."
+]
+  },
   "GROUP_BY_POS_OUT_OF_RANGE" : {
 "message" : [
   "GROUP BY position  is not in select list (valid range is [1, 
])."
@@ -5008,11 +5013,6 @@
   "Correlated scalar subquery '' is neither present in the group 
by, nor in an aggregate function. Add it to group by using ordinal position or 
wrap it in first() (or first_value) if you don't care which value you get."
 ]
   },
-  "_LEGACY_ERROR_TEMP_2424" : {
-"message" : [
-  "aggregate functions are not allowed in GROUP BY, but found "
-]
-  },
   "_LEGACY_ERROR_TEMP_2425" : {
 "message" : [
   "expression  cannot be used as a grouping expression because 
its data type  is not an orderable data type."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 9e41bcebe47..1ce1fcd0144 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -413,7 +413,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 def checkValidGroupingExprs(expr: Expression): Unit = {
   if (expr.exists(_.isInstanceOf[AggregateExpression])) {
 expr.failAnalysis(
-  errorClass = "_LEGACY_ERROR_TEMP_2424",
+  errorClass = "GROUP_BY_AGGREGATE",
   messageParameters = Map("sqlExpr" -> expr.sql))
   }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index 6ccc0c34ff0..1075a6ab887 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -213,7 +213,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_2424",
+  "errorClass" : "GROUP_BY_AGGREGATE",
   "messageParameters" : {
 "sqlExpr" : "count(testdata.b)"
   },
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
index 4d336adc412..093cdcac25a 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
@@ -190,7 +190,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_2424",
+  "errorClass" : "GROUP_BY_AGGREGATE",
   "messageParameters" : {
 "sqlExpr" : "CAST(udf(cast(count(b) as string)) AS BIGINT)"
   },
diff --git 

[spark] branch branch-3.3 updated: [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 libs

2022-11-10 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new bea58e4b863 [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 
libs
bea58e4b863 is described below

commit bea58e4b8634fbe3497431cb50351aa186107fb3
Author: Cheng Pan 
AuthorDate: Thu Nov 10 08:57:56 2022 -0600

[SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 libs

### What changes were proposed in this pull request?

SPARK-27610 relocated the netty x86 native libs, and the recent version 
netty ships arm64 native libs as well, we should do same thing to make it works 
on arm64 platform.

### Why are the changes needed?

Align arm64 behavior w/ x86

### Does this PR introduce _any_ user-facing change?

Yes, bug fix for ARM64 platform.

### How was this patch tested?

Before patch
```
➜  apache-spark git:(SPARK-41089) ll 
common/network-yarn/target/exploded/META-INF/native
total 752
-rw-r--r--  1 chengpan  staff   101K Oct 11 23:24 
libnetty_transport_native_epoll_aarch_64.so
-rw-r--r--  1 chengpan  staff94K Oct 11 17:57 
libnetty_transport_native_kqueue_aarch_64.jnilib
-rw-r--r--  1 chengpan  staff93K Oct 11 23:27 
liborg_sparkproject_netty_transport_native_epoll_x86_64.so
-rw-r--r--  1 chengpan  staff77K Oct 11 17:51 
liborg_sparkproject_netty_transport_native_kqueue_x86_64.jnilib
drwxr-xr-x  3 chengpan  staff96B Nov  9 13:46 linux32
drwxr-xr-x  3 chengpan  staff96B Nov  9 13:46 linux64
drwxr-xr-x  3 chengpan  staff96B Nov  9 13:46 osx
drwxr-xr-x  3 chengpan  staff96B Nov  9 13:46 windows32
drwxr-xr-x  3 chengpan  staff96B Nov  9 13:46 windows64
```

After patch
```
➜  apache-spark git:(SPARK-41089) ll 
common/network-yarn/target/exploded/META-INF/native
total 752
-rw-r--r--  1 chengpan  staff   101K Oct 11 23:24 
liborg_sparkproject_netty_transport_native_epoll_aarch_64.so
-rw-r--r--  1 chengpan  staff93K Oct 11 23:27 
liborg_sparkproject_netty_transport_native_epoll_x86_64.so
-rw-r--r--  1 chengpan  staff94K Oct 11 17:57 
liborg_sparkproject_netty_transport_native_kqueue_aarch_64.jnilib
-rw-r--r--  1 chengpan  staff77K Oct 11 17:51 
liborg_sparkproject_netty_transport_native_kqueue_x86_64.jnilib
drwxr-xr-x  3 chengpan  staff96B Nov 10 12:07 linux32
drwxr-xr-x  3 chengpan  staff96B Nov 10 12:07 linux64
drwxr-xr-x  3 chengpan  staff96B Nov 10 12:07 osx
drwxr-xr-x  3 chengpan  staff96B Nov 10 12:07 windows32
drwxr-xr-x  3 chengpan  staff96B Nov 10 12:07 windows64
```

Closes #38593 from pan3793/SPARK-41089.

Authored-by: Cheng Pan 
Signed-off-by: Sean Owen 
(cherry picked from commit c72d39990182ad2207c8cdd523af06ee4dc02fc5)
Signed-off-by: Sean Owen 
---
 common/network-yarn/pom.xml | 4 
 1 file changed, 4 insertions(+)

diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 14d41802a8b..81146a36c98 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -174,6 +174,10 @@
   
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_epoll_x86_64.so"
 />
 
+
+
 
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (c5d27603f29 -> c72d3999018)

2022-11-10 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c5d27603f29 [SPARK-41064][CONNECT][PYTHON] Implement 
`DataFrame.crosstab` and `DataFrame.stat.crosstab`
 add c72d3999018 [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 
libs

No new revisions were added by this update.

Summary of changes:
 common/network-yarn/pom.xml | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41064][CONNECT][PYTHON] Implement `DataFrame.crosstab` and `DataFrame.stat.crosstab`

2022-11-10 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c5d27603f29 [SPARK-41064][CONNECT][PYTHON] Implement 
`DataFrame.crosstab` and `DataFrame.stat.crosstab`
c5d27603f29 is described below

commit c5d27603f29437f1686cac70727594c19410a273
Author: Ruifeng Zheng 
AuthorDate: Thu Nov 10 18:15:54 2022 +0800

[SPARK-41064][CONNECT][PYTHON] Implement `DataFrame.crosstab` and 
`DataFrame.stat.crosstab`

### What changes were proposed in this pull request?
Implement `DataFrame.crosstab` and `DataFrame.stat.crosstab`

### Why are the changes needed?
for api coverage

### Does this PR introduce _any_ user-facing change?
yes, new api

### How was this patch tested?
added ut

Closes #38578 from zhengruifeng/connect_df_crosstab.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../main/protobuf/spark/connect/relations.proto|  19 +++
 .../org/apache/spark/sql/connect/dsl/package.scala |  17 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  10 ++
 .../connect/planner/SparkConnectProtoSuite.scala   |   6 +
 python/pyspark/sql/connect/dataframe.py|  62 ++
 python/pyspark/sql/connect/plan.py |  32 +
 python/pyspark/sql/connect/proto/relations_pb2.py  | 134 +++--
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  50 
 .../sql/tests/connect/test_connect_plan_only.py|  10 ++
 9 files changed, 274 insertions(+), 66 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index b3613fc908d..639d1bafce5 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -52,6 +52,7 @@ message Relation {
 
 // stat functions
 StatSummary summary = 100;
+StatCrosstab crosstab = 101;
 
 Unknown unknown = 999;
   }
@@ -284,6 +285,24 @@ message StatSummary {
   repeated string statistics = 2;
 }
 
+// Computes a pair-wise frequency table of the given columns. Also known as a 
contingency table.
+// It will invoke 'Dataset.stat.crosstab' (same as 
'StatFunctions.crossTabulate')
+// to compute the results.
+message StatCrosstab {
+  // (Required) The input relation.
+  Relation input = 1;
+
+  // (Required) The name of the first column.
+  //
+  // Distinct items will make the first item of each row.
+  string col1 = 2;
+
+  // (Required) The name of the second column.
+  //
+  // Distinct items will make the column names of the DataFrame.
+  string col2 = 3;
+}
+
 // Rename columns on the input relation by the same length of names.
 message RenameColumnsBySameLengthNames {
   // Required. The input relation.
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 381cbf7a9a8..5e7a94da347 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -227,6 +227,21 @@ package object dsl {
   }
 }
 
+implicit class DslStatFunctions(val logicalPlan: Relation) {
+  def crosstab(col1: String, col2: String): Relation = {
+Relation
+  .newBuilder()
+  .setCrosstab(
+proto.StatCrosstab
+  .newBuilder()
+  .setInput(logicalPlan)
+  .setCol1(col1)
+  .setCol2(col2)
+  .build())
+  .build()
+  }
+}
+
 implicit class DslLogicalPlan(val logicalPlan: Relation) {
   def select(exprs: Expression*): Relation = {
 Relation
@@ -463,6 +478,8 @@ package object dsl {
 
Repartition.newBuilder().setInput(logicalPlan).setNumPartitions(num).setShuffle(true))
   .build()
 
+  def stat: DslStatFunctions = new DslStatFunctions(logicalPlan)
+
   def summary(statistics: String*): Relation = {
 Relation
   .newBuilder()
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 148f5569683..04ce880a925 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -67,6 +67,8 @@ class SparkConnectPlanner(plan: proto.Relation, session: 
SparkSession) {
 transformSubqueryAlias(rel.getSubqueryAlias)
   case proto.Relation.RelTypeCase.REPARTITION => 

[spark] branch master updated: [SPARK-41034][CONNECT][PYTHON][FOLLOW-UP] Fix mypy annotations test

2022-11-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c2a8e48e70a [SPARK-41034][CONNECT][PYTHON][FOLLOW-UP] Fix mypy 
annotations test
c2a8e48e70a is described below

commit c2a8e48e70abfb6bd101c99c5a0f6017151fc85e
Author: Ruifeng Zheng 
AuthorDate: Thu Nov 10 18:25:50 2022 +0900

[SPARK-41034][CONNECT][PYTHON][FOLLOW-UP] Fix mypy annotations test

### What changes were proposed in this pull request?
Fix python mypy

### Why are the changes needed?
https://github.com/apache/spark/pull/38541 breaks the linter

```
starting mypy annotations test...
annotations failed mypy checks:
python/pyspark/sql/connect/plan.py:683: error: Argument 1 to "plan" of 
"LogicalPlan" has incompatible type "Optional[RemoteSparkSession]"; expected 
"RemoteSparkSession"  [arg-type]
python/pyspark/sql/connect/plan.py:815: error: Argument 1 to "plan" of 
"LogicalPlan" has incompatible type "Optional[RemoteSparkSession]"; expected 
"RemoteSparkSession"  [arg-type]
Found 2 errors in 1 file (checked 369 source files)
1
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
local test

Closes #38597 from zhengruifeng/fix_py_lint_1.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/plan.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index db76d84fea1..047c9f2ce0f 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -677,7 +677,7 @@ class Repartition(LogicalPlan):
 self._num_partitions = num_partitions
 self._shuffle = shuffle
 
-def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
 rel = proto.Relation()
 if self._child is not None:
 rel.repartition.input.CopyFrom(self._child.plan(session))
@@ -809,7 +809,7 @@ class StatSummary(LogicalPlan):
 super().__init__(child)
 self.statistics = statistics
 
-def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation:
+def plan(self, session: "RemoteSparkSession") -> proto.Relation:
 assert self._child is not None
 plan = proto.Relation()
 plan.summary.input.CopyFrom(self._child.plan(session))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41092][SQL] Do not use identifier to match interval units

2022-11-10 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 012d99d3a3d [SPARK-41092][SQL] Do not use identifier to match interval 
units
012d99d3a3d is described below

commit 012d99d3a3d92819d5a26cddcfd566c46380b952
Author: Wenchen Fan 
AuthorDate: Thu Nov 10 11:36:21 2022 +0300

[SPARK-41092][SQL] Do not use identifier to match interval units

### What changes were proposed in this pull request?

The current antlr-based SQL parser is pretty fragile due to the fact that 
we make the antlr parser rule pretty flexible and push more parsing logic to 
the Scala side (`AstBuilder`). A tiny change to the antlr parser rule may break 
the parser unexpectedly. As an example, in 
https://github.com/apache/spark/pull/38404 , we added a new parser rule to 
extend the INSERT syntax, and it breaks interval literal. `select b + interval 
'1 month' from values (1, 1)` can't be parsed after https://g [...]

This PR makes the interval literal parser rule stricter. Now it lists all 
the allowed interval units instead of matching an identifier. This fixes the 
issue we hit in https://github.com/apache/spark/pull/38404 .

In the future, we can revisit other parser rules and try to rely on antlr 
more to do the parsing work.

### Why are the changes needed?

fix parser issues we hit in https://github.com/apache/spark/pull/38404

### Does this PR introduce _any_ user-facing change?

The error message is changed a little bit for `SELECT INTERVAL 1 
wrong_unit`.

### How was this patch tested?

existing tests

Closes #38583 from cloud-fan/parser.

Authored-by: Wenchen Fan 
Signed-off-by: Max Gekk 
---
 docs/sql-ref-ansi-compliance.md| 11 +++
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4  | 11 +++
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 36 --
 .../sql-tests/results/ansi/interval.sql.out| 15 +++--
 .../resources/sql-tests/results/interval.sql.out   | 15 +++--
 5 files changed, 66 insertions(+), 22 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index a59d145d551..1501e14c604 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -407,6 +407,7 @@ Below is a list of all the keywords in Spark SQL.
 |DATEADD|non-reserved|non-reserved|non-reserved|
 |DATEDIFF|non-reserved|non-reserved|non-reserved|
 |DAY|non-reserved|non-reserved|non-reserved|
+|DAYS|non-reserved|non-reserved|non-reserved|
 |DAYOFYEAR|non-reserved|non-reserved|non-reserved|
 |DBPROPERTIES|non-reserved|non-reserved|non-reserved|
 |DEFAULT|non-reserved|non-reserved|non-reserved|
@@ -456,6 +457,7 @@ Below is a list of all the keywords in Spark SQL.
 |GROUPING|non-reserved|non-reserved|reserved|
 |HAVING|reserved|non-reserved|reserved|
 |HOUR|non-reserved|non-reserved|non-reserved|
+|HOURS|non-reserved|non-reserved|non-reserved|
 |IF|non-reserved|non-reserved|not a keyword|
 |IGNORE|non-reserved|non-reserved|non-reserved|
 |IMPORT|non-reserved|non-reserved|non-reserved|
@@ -495,13 +497,19 @@ Below is a list of all the keywords in Spark SQL.
 |MATCHED|non-reserved|non-reserved|non-reserved|
 |MERGE|non-reserved|non-reserved|non-reserved|
 |MICROSECOND|non-reserved|non-reserved|non-reserved|
+|MICROSECONDS|non-reserved|non-reserved|non-reserved|
 |MILLISECOND|non-reserved|non-reserved|non-reserved|
+|MILLISECONDS|non-reserved|non-reserved|non-reserved|
 |MINUTE|non-reserved|non-reserved|non-reserved|
+|MINUTES|non-reserved|non-reserved|non-reserved|
 |MINUS|non-reserved|strict-non-reserved|non-reserved|
 |MONTH|non-reserved|non-reserved|non-reserved|
+|MONTHS|non-reserved|non-reserved|non-reserved|
 |MSCK|non-reserved|non-reserved|non-reserved|
 |NAMESPACE|non-reserved|non-reserved|non-reserved|
 |NAMESPACES|non-reserved|non-reserved|non-reserved|
+|NANOSECOND|non-reserved|non-reserved|non-reserved|
+|NANOSECONDS|non-reserved|non-reserved|non-reserved|
 |NATURAL|reserved|strict-non-reserved|reserved|
 |NO|non-reserved|non-reserved|reserved|
 |NOT|reserved|non-reserved|reserved|
@@ -565,6 +573,7 @@ Below is a list of all the keywords in Spark SQL.
 |SCHEMA|non-reserved|non-reserved|non-reserved|
 |SCHEMAS|non-reserved|non-reserved|non-reserved|
 |SECOND|non-reserved|non-reserved|non-reserved|
+|SECONDS|non-reserved|non-reserved|non-reserved|
 |SELECT|reserved|non-reserved|reserved|
 |SEMI|non-reserved|strict-non-reserved|non-reserved|
 |SEPARATED|non-reserved|non-reserved|non-reserved|
@@ -631,10 +640,12 @@ Below is a list of all the keywords in Spark SQL.
 |VIEW|non-reserved|non-reserved|non-reserved|
 |VIEWS|non-reserved|non-reserved|non-reserved|
 |WEEK|non-reserved|non-reserved|non-reserved|