[spark] branch branch-3.4 updated: [SPARK-43522][SQL] Fix creating struct column name with index of array

2023-05-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new a18d71a1bc5 [SPARK-43522][SQL] Fix creating struct column name with 
index of array
a18d71a1bc5 is described below

commit a18d71a1bc5891bda91ad2c196768dfe86e49f30
Author: Jia Fan 
AuthorDate: Thu May 18 14:29:30 2023 +0800

[SPARK-43522][SQL] Fix creating struct column name with index of array

### What changes were proposed in this pull request?

When creating a struct column in Dataframe, the code that ran without 
problems in version 3.3.1 does not work in version 3.4.0.

In 3.3.1
```scala
val testDF = Seq("a=b,c=d,d=f").toDF.withColumn("key_value", split('value, 
",")).withColumn("map_entry", transform(col("key_value"), x => struct(split(x, 
"=").getItem(0), split(x, "=").getItem(1) ) ))
testDF.show()

+---+---++
|  value|  key_value|   map_entry|
+---+---++
|a=b,c=d,d=f|[a=b, c=d, d=f]|[{a, b}, {c, d}, ...|
+---+---++
```

In 3.4.0

```
org.apache.spark.sql.AnalysisException: 
[DATATYPE_MISMATCH.CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING] Cannot resolve 
"struct(split(namedlambdavariable(), =, -1)[0], split(namedlambdavariable(), =, 
-1)[1])" due to data type mismatch: Only foldable `STRING` expressions are 
allowed to appear at odd position, but they are ["0", "1"].;
'Project [value#41, key_value#45, transform(key_value#45, 
lambdafunction(struct(0, split(lambda x_3#49, =, -1)[0], 1, split(lambda 
x_3#49, =, -1)[1]), lambda x_3#49, false)) AS map_entry#48]
+- Project [value#41, split(value#41, ,, -1) AS key_value#45]
   +- LocalRelation [value#41]  at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:269)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
  at scala.collection.Iterator.foreach(Iterator.scala:943)
  at scala.collection.Iterator.foreach$(Iterator.scala:943)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)

```

The reason is `CreateNamedStruct` will use last expr of value `Expression` 
as column name. And will check it must are `String`. But array `Expression`'s 
last expr are `Integer`. The check will failed. So we can skip match with 
`UnresolvedExtractValue` when last expr not `String`. Then it will when fall 
back to the default name.

### Why are the changes needed?

Fix the bug when creating struct column name with index of array

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test

Closes #41187 from Hisoka-X/SPARK-43522_struct_name_array.

Authored-by: Jia Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f2a29176de6e0b1628de6ca962cbf5036b145e0a)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/expressions/complexTypeCreator.scala   | 2 +-
 sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala  | 8 
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index ea871d453b3..20512191312 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -371,7 +

[spark] branch master updated (5e599953889 -> f2a29176de6)

2023-05-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5e599953889 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to 
prevent cloned plan from referencing same objects
 add f2a29176de6 [SPARK-43522][SQL] Fix creating struct column name with 
index of array

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/expressions/complexTypeCreator.scala   | 2 +-
 sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala  | 8 
 2 files changed, 9 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects

2023-05-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 5cc4c5d2912 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to 
prevent cloned plan from referencing same objects
5cc4c5d2912 is described below

commit 5cc4c5d2912a999e247f6c6f92c8ba858946594a
Author: Rob Reeves 
AuthorDate: Thu May 18 14:24:05 2023 +0800

[SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned 
plan from referencing same objects

### What changes were proposed in this pull request?
This is the most narrow fix for the issue observed in SPARK-43157. It does 
not attempt to identify or solve all potential correctness and concurrency 
issues from TreeNode.tags being modified in multiple places. It solves the 
issue described in  SPARK-43157 by cloning the cached plan when populating 
`InMemoryRelation.innerChildren`. I chose to do the clone at this point to 
limit the scope to tree traversal used for building up the string 
representation of the plan, which is where we se [...]

Another solution I tried was to modify `InMemoryRelation.clone` to create a 
new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go 
with this approach because `CachedRDDBuilder` has mutable state that needs to 
be moved to the new object and I didn't want to add that complexity if not 
needed.

### Why are the changes needed?
When caching is used the cached part of the SparkPlan is leaked to new 
clones of the plan. This leakage is an issue because if the TreeNode.tags are 
modified in one plan, it impacts the other plan. This is a correctness issue 
and a concurrency issue if the TreeNode.tags are set in different threads for 
the cloned plans.

See the description of 
[SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example 
of the concurrency issue.

### Does this PR introduce _any_ user-facing change?
Yes. It fixes a driver hanging issue the user can observe.

### How was this patch tested?
Unit test added and I manually verified `Dataset.explain("formatted")` 
still had the expected output.
```scala
spark.range(10).cache.filter($"id" > 5).explain("formatted")

== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
  +- InMemoryRelation (2)
+- * Range (3)

(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]

(2) InMemoryRelation
Arguments: [id#0L], 
CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk,
 memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))

(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)
```

I also verified that the `InMemory.innerChildren` is cloned when the entire 
plan is cloned.
```scala
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
None
  } else {
(plan.subqueries.flatMap(p => findCacheOperator(p)) ++
  plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)

// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get

// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same 
object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object

println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed
```

Closes #40812 from robreeves/roreeves/explain_util.

Authored-by: Rob Reeves 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 5e5999538899732bf3cdd04b974f1abeb949ccd0)
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/columnar/InMemoryRelation.scala  | 12 ++-
 .../execution/columnar/InMemoryRelationSuite.scala | 37 

[spark] branch master updated: [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects

2023-05-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5e599953889 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to 
prevent cloned plan from referencing same objects
5e599953889 is described below

commit 5e5999538899732bf3cdd04b974f1abeb949ccd0
Author: Rob Reeves 
AuthorDate: Thu May 18 14:24:05 2023 +0800

[SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned 
plan from referencing same objects

### What changes were proposed in this pull request?
This is the most narrow fix for the issue observed in SPARK-43157. It does 
not attempt to identify or solve all potential correctness and concurrency 
issues from TreeNode.tags being modified in multiple places. It solves the 
issue described in  SPARK-43157 by cloning the cached plan when populating 
`InMemoryRelation.innerChildren`. I chose to do the clone at this point to 
limit the scope to tree traversal used for building up the string 
representation of the plan, which is where we se [...]

Another solution I tried was to modify `InMemoryRelation.clone` to create a 
new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go 
with this approach because `CachedRDDBuilder` has mutable state that needs to 
be moved to the new object and I didn't want to add that complexity if not 
needed.

### Why are the changes needed?
When caching is used the cached part of the SparkPlan is leaked to new 
clones of the plan. This leakage is an issue because if the TreeNode.tags are 
modified in one plan, it impacts the other plan. This is a correctness issue 
and a concurrency issue if the TreeNode.tags are set in different threads for 
the cloned plans.

See the description of 
[SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example 
of the concurrency issue.

### Does this PR introduce _any_ user-facing change?
Yes. It fixes a driver hanging issue the user can observe.

### How was this patch tested?
Unit test added and I manually verified `Dataset.explain("formatted")` 
still had the expected output.
```scala
spark.range(10).cache.filter($"id" > 5).explain("formatted")

== Physical Plan ==
* Filter (4)
+- InMemoryTableScan (1)
  +- InMemoryRelation (2)
+- * Range (3)

(1) InMemoryTableScan
Output [1]: [id#0L]
Arguments: [id#0L], [(id#0L > 5)]

(2) InMemoryRelation
Arguments: [id#0L], 
CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk,
 memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16)
,None), [id#0L ASC NULLS FIRST]

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (0, 10, step=1, splits=Some(16))

(4) Filter [codegen id : 1]
Input [1]: [id#0L]
Condition : (id#0L > 5)
```

I also verified that the `InMemory.innerChildren` is cloned when the entire 
plan is cloned.
```scala
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import spark.implicits._

def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
  if (plan.isInstanceOf[InMemoryTableScanExec]) {
Some(plan.asInstanceOf[InMemoryTableScanExec])
  } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
None
  } else {
(plan.subqueries.flatMap(p => findCacheOperator(p)) ++
  plan.children.flatMap(findCacheOperator)).headOption
  }
}

val df = spark.range(10).filter($"id" < 100).cache()
val df1 = df.limit(1)
val df2 = df.limit(1)

// Get the cache operator (InMemoryTableScanExec) in each plan
val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get

// Check if InMemoryTableScanExec references point to the same object
println(plan1.eq(plan2))
// returns false// Check if InMemoryRelation references point to the same 
object

println(plan1.relation.eq(plan2.relation))
// returns false

// Check if the cached SparkPlan references point to the same object

println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head))
// returns false
// This shows the issue is fixed
```

Closes #40812 from robreeves/roreeves/explain_util.

Authored-by: Rob Reeves 
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/columnar/InMemoryRelation.scala  | 12 ++-
 .../execution/columnar/InMemoryRelationSuite.scala | 37 ++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apach

[spark] branch master updated: [SPARK-43413][SQL][FOLLOWUP] Show a directional message in ListQuery nullability assertion

2023-05-17 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 12f956cffd1 [SPARK-43413][SQL][FOLLOWUP] Show a directional message in 
ListQuery nullability assertion
12f956cffd1 is described below

commit 12f956cffd1dca81a145122310134ec8e57cb5aa
Author: Jack Chen 
AuthorDate: Wed May 17 23:02:34 2023 -0700

[SPARK-43413][SQL][FOLLOWUP] Show a directional message in ListQuery 
nullability assertion

### What changes were proposed in this pull request?
In case the assert for the call to ListQuery.nullable is hit, mention in 
the assert error message the conf flag that can be used to disable the assert. 
Follow-up to https://github.com/apache/spark/pull/41094#discussion_r1195438179

### Why are the changes needed?
Improve error message.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit tests

Closes #41202 from jchen5/in-nullability-assert.

Authored-by: Jack Chen 
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/sql/catalyst/expressions/subquery.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
index b0f10895c17..cd31b94905a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
@@ -372,7 +372,8 @@ case class ListQuery(
 // ListQuery can't be executed alone so its nullability is not defined.
 // Consider using ListQuery.childOutputs.exists(_.nullable)
 if (!SQLConf.get.getConf(SQLConf.LEGACY_IN_SUBQUERY_NULLABILITY)) {
-  assert(false, "ListQuery nullability is not defined")
+  assert(false, "ListQuery nullability is not defined. To restore the 
legacy behavior before " +
+s"Spark 3.5.0, set ${SQLConf.LEGACY_IN_SUBQUERY_NULLABILITY.key}=true")
 }
 false
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43548][SS] Remove workaround for HADOOP-16255

2023-05-17 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a983591bbbf [SPARK-43548][SS] Remove workaround for HADOOP-16255
a983591bbbf is described below

commit a983591bbbf3ecd7b99b26d6b55590836121857b
Author: panbingkun 
AuthorDate: Wed May 17 22:41:53 2023 -0700

[SPARK-43548][SS] Remove workaround for HADOOP-16255

### What changes were proposed in this pull request?
The pr aims to remove workaround for HADOOP-16255.
https://issues.apache.org/jira/browse/HADOOP-16255

### Why are the changes needed?
- Because HADOOP-16255 has been fix after hadoop version  3.1.2. Spark 
support hadoop version: >= 3.2.2 or >= 3.3.1
- Make code clean.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #41209 from panbingkun/SPARK-43548.

Authored-by: panbingkun 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/execution/streaming/CheckpointFileManager.scala| 14 --
 1 file changed, 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 6df0a2f3063..ad3212871fc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -370,20 +370,6 @@ class FileContextBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuratio
   override def renameTempFile(srcPath: Path, dstPath: Path, 
overwriteIfPossible: Boolean): Unit = {
 import Options.Rename._
 fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
-// TODO: this is a workaround of HADOOP-16255 - remove this when 
HADOOP-16255 is resolved
-mayRemoveCrcFile(srcPath)
-  }
-
-  private def mayRemoveCrcFile(path: Path): Unit = {
-try {
-  val checksumFile = new Path(path.getParent, s".${path.getName}.crc")
-  if (exists(checksumFile)) {
-// checksum file exists, deleting it
-delete(checksumFile)
-  }
-} catch {
-  case NonFatal(_) => // ignore, we are removing crc file as "best-effort"
-}
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43509][PYTHON][CONNECT][FOLLOW-UP] Set SPARK_CONNECT_MODE_ENABLED when running pyspark shell with remote is local

2023-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 32bdf8a5b52 [SPARK-43509][PYTHON][CONNECT][FOLLOW-UP] Set 
SPARK_CONNECT_MODE_ENABLED when running pyspark shell with remote is local
32bdf8a5b52 is described below

commit 32bdf8a5b52ab1370dd009cd5967042c7ac20446
Author: Hyukjin Kwon 
AuthorDate: Thu May 18 14:35:39 2023 +0900

[SPARK-43509][PYTHON][CONNECT][FOLLOW-UP] Set SPARK_CONNECT_MODE_ENABLED 
when running pyspark shell with remote is local

### What changes were proposed in this pull request?

This PR is a followup of https://github.com/apache/spark/pull/41013 that 
sets `SPARK_CONNECT_MODE_ENABLED` when running PySpark shell via `bin/pyspark 
--remote=local` so it works.

### Why are the changes needed?

It is broken after the PR:

```bash
./bin/pyspark --remote local
```

```python
...
Traceback (most recent call last):
  File "/.../python/pyspark/shell.py", line 78, in 
sc = spark.sparkContext
  File "/.../python/pyspark/sql/connect/session.py", line 567, in 
sparkContext
raise PySparkNotImplementedError(
pyspark.errors.exceptions.base.PySparkNotImplementedError: 
[NOT_IMPLEMENTED] sparkContext() is not implemented.
```

### Does this PR introduce _any_ user-facing change?

No to end users.

Yes to the dev as described above.

### How was this patch tested?

Manually tested via `./bin/pyspark --remote local`

Closes #41206 from HyukjinKwon/SPARK-43509-followup.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/launcher/SparkSubmitCommandBuilder.java | 1 +
 python/pyspark/shell.py  | 9 +
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 289eb31db31..9618673b4ce 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -363,6 +363,7 @@ class SparkSubmitCommandBuilder extends 
AbstractCommandBuilder {
 }
 if (remoteStr != null) {
   env.put("SPARK_REMOTE", remoteStr);
+  env.put("SPARK_CONNECT_MODE_ENABLED", "1");
 }
 
 if (!isEmpty(pyOpts)) {
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 86f576a3029..7e2093c1d31 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -100,10 +100,11 @@ print(
 % (platform.python_version(), platform.python_build()[0], 
platform.python_build()[1])
 )
 if is_remote():
-print(
-"Client connected to the Spark Connect server at %s"
-% urlparse(os.environ["SPARK_REMOTE"]).netloc
-)
+url = os.environ.get("SPARK_REMOTE", None)
+assert url is not None
+if url.startswith("local"):
+url = "sc://localhost"  # only for display in the console.
+print("Client connected to the Spark Connect server at %s" % 
urlparse(url).netloc)
 else:
 print("Spark context Web UI available at %s" % (sc.uiWebUrl))  # type: 
ignore[union-attr]
 print(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated (f89e52037bf -> b0f5a7abda7)

2023-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


from f89e52037bf [SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration 
notes for pandas API on Spark
 add b0f5a7abda7 [SPARK-43547][3.4][PS][DOCS] Update "Supported Pandas API" 
page to point out the proper pandas docs

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/supported_api_gen.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration notes for pandas API on Spark

2023-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new f89e52037bf [SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration 
notes for pandas API on Spark
f89e52037bf is described below

commit f89e52037bf6fcd560fe4698de427b57ae1c36eb
Author: itholic 
AuthorDate: Thu May 18 12:02:26 2023 +0900

[SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration notes for pandas 
API on Spark

### What changes were proposed in this pull request?

This is follow-up for https://github.com/apache/spark/pull/40459 to fix the 
incorrect information and to elaborate more detailed changes.
- We're not fully support the pandas 2.0.0, so the information "Pandas API 
on Spark follows for the pandas 2.0" is not correct.
- We should list all the APIs that no longer support `inplace` parameter.

### Why are the changes needed?

Correctness for migration notes.

### Does this PR introduce _any_ user-facing change?

No, only updating migration notes.

### How was this patch tested?

The existing CI should pass

Closes #41207 from itholic/migration_guide_followup.

Authored-by: itholic 
Signed-off-by: Hyukjin Kwon 
---
 python/docs/source/migration_guide/pyspark_upgrade.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 7513d64ef6c..f570f8e9dfb 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -33,8 +33,8 @@ Upgrading from PySpark 3.3 to 3.4
 * In Spark 3.4, the ``Series.concat`` sort parameter will be respected to 
follow pandas 1.4 behaviors.
 * In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace 
pre-existing arrays, which will NOT be over-written to follow pandas 1.4 
behaviors.
 * In Spark 3.4, the ``SparkSession.sql`` and the Pandas on Spark API ``sql`` 
have got new parameter ``args`` which provides binding of named parameters to 
their SQL literals.
-* In Spark 3.4, Pandas API on Spark follows for the pandas 2.0, and some APIs 
were deprecated or removed in Spark 3.4 according to the changes made in pandas 
2.0. Please refer to the [release notes of 
pandas](https://pandas.pydata.org/docs/dev/whatsnew/) for more details.
 * In Spark 3.4, the custom monkey-patch of ``collections.namedtuple`` was 
removed, and ``cloudpickle`` was used by default. To restore the previous 
behavior for any relevant pickling issue of ``collections.namedtuple``, set 
``PYSPARK_ENABLE_NAMEDTUPLE_PATCH`` environment variable to ``1``.
+* In Spark 3.4, the ``inplace`` parameter is no longer supported for Pandas 
API on Spark API ``add_categories``, ``remove_categories``, 
``remove_unused_categories``, ``rename_categories``, ``reorder_categories``, 
``set_categories`` to follow pandas 2.0.0 behaviors.
 
 
 Upgrading from PySpark 3.2 to 3.3


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43022][CONNECT] Support protobuf functions for Scala client

2023-05-17 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2dd112ec137 [SPARK-43022][CONNECT] Support protobuf functions for 
Scala client
2dd112ec137 is described below

commit 2dd112ec1372346ff573e5c273cc5cb93bda033a
Author: yangjie01 
AuthorDate: Thu May 18 10:06:05 2023 +0800

[SPARK-43022][CONNECT] Support protobuf functions for Scala client

### What changes were proposed in this pull request?
This pr aims to support protobuf functions for Scala client.

### Why are the changes needed?
Add Spark connect jvm client api coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Add new test
- Checked Scala 2.13

Closes #40654 from LuciferYang/protobuf-functions.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Signed-off-by: yangjie01 
---
 .../org/apache/spark/sql/protobuf/functions.scala  | 202 +
 .../org/apache/spark/sql/FunctionTestSuite.scala   |  11 ++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  63 +++
 .../src/test/resources/protobuf-tests/common.desc  |  12 ++
 .../from_protobuf_messageClassName.explain |   2 +
 ..._protobuf_messageClassName_descFilePath.explain |   2 +
 ...f_messageClassName_descFilePath_options.explain |   2 +
 .../from_protobuf_messageClassName_options.explain |   2 +
 .../to_protobuf_messageClassName.explain   |   2 +
 ..._protobuf_messageClassName_descFilePath.explain |   2 +
 ...f_messageClassName_descFilePath_options.explain |   2 +
 .../to_protobuf_messageClassName_options.explain   |   2 +
 .../queries/from_protobuf_messageClassName.json|  29 +++
 .../from_protobuf_messageClassName.proto.bin   | Bin 0 -> 125 bytes
 ...rom_protobuf_messageClassName_descFilePath.json |  33 
 ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 0 -> 156 bytes
 ...obuf_messageClassName_descFilePath_options.json |  46 +
 ...messageClassName_descFilePath_options.proto.bin | Bin 0 -> 206 bytes
 .../from_protobuf_messageClassName_options.json|  42 +
 ...rom_protobuf_messageClassName_options.proto.bin | Bin 0 -> 174 bytes
 .../queries/to_protobuf_messageClassName.json  |  29 +++
 .../queries/to_protobuf_messageClassName.proto.bin | Bin 0 -> 123 bytes
 .../to_protobuf_messageClassName_descFilePath.json |  33 
 ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 0 -> 154 bytes
 ...obuf_messageClassName_descFilePath_options.json |  46 +
 ...messageClassName_descFilePath_options.proto.bin | Bin 0 -> 204 bytes
 .../to_protobuf_messageClassName_options.json  |  42 +
 .../to_protobuf_messageClassName_options.proto.bin | Bin 0 -> 172 bytes
 connector/connect/server/pom.xml   |   6 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |  59 ++
 .../sql/protobuf/CatalystDataToProtobuf.scala  |   2 +-
 .../sql/protobuf/ProtobufDataToCatalyst.scala  |   2 +-
 32 files changed, 671 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
new file mode 100644
index 000..c42f8417155
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.functions.{fnWithOptions, lit}
+
+// scalastyle:off: object.name
+object functions {
+  // scalastyle:on: object.name
+
+  /**
+   * Converts a binary column of Protobuf format into its corresponding 
catalyst value. The
+   * Protobuf definition is provided through Protobuf descriptor file.
+   *
+   * @param data
+   *   the binary column.
+   * @param messageName
+   *   the pro

[spark] branch master updated: [SPARK-43436][BUILD] Upgrade rocksdbjni to 8.1.1.1

2023-05-17 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e8c347b5499 [SPARK-43436][BUILD] Upgrade rocksdbjni to 8.1.1.1
e8c347b5499 is described below

commit e8c347b549901bec5c0a1f5d5d3e5430adbe69b5
Author: YangJie 
AuthorDate: Wed May 17 13:30:08 2023 -0700

[SPARK-43436][BUILD] Upgrade rocksdbjni to 8.1.1.1

### What changes were proposed in this pull request?
This pr aims to upgrade rocksdbjni from 8.0.0 to 8.1.1.1.

### Why are the changes needed?
The full release notes as follows:

- https://github.com/facebook/rocksdb/releases/tag/v8.1.1

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GitHub Actions
- Manual test `RocksDBBenchmark`:

**8.0.0**

```
[INFO] Running org.apache.spark.util.kvstore.RocksDBBenchmark
count   meanmin max 
95th
dbClose 4   0.339   0.256   
0.532   0.532
dbCreation  4   69.637  4.128   
268.371 268.371
naturalIndexCreateIterator  10240.005   0.002   
1.581   0.006
naturalIndexDescendingCreateIterator10240.005   0.005   
0.067   0.007
naturalIndexDescendingIteration 10240.006   0.004   
0.029   0.008
naturalIndexIteration   10240.006   0.004   
0.069   0.009
randomDeleteIndexed 10240.026   0.018   
0.200   0.035
randomDeletesNoIndex10240.015   0.012   
0.050   0.018
randomUpdatesIndexed10240.088   0.033   
33.234  0.096
randomUpdatesNoIndex10240.035   0.032   
0.497   0.039
randomWritesIndexed 10240.129   0.035   
56.362  0.121
randomWritesNoIndex 10240.041   0.034   
1.415   0.047
refIndexCreateIterator  10240.005   0.004   
0.029   0.006
refIndexDescendingCreateIterator10240.003   0.003   
0.028   0.004
refIndexDescendingIteration 10240.007   0.005   
0.044   0.008
refIndexIteration   10240.008   0.005   
0.367   0.010
sequentialDeleteIndexed 10240.023   0.017   
1.716   0.025
sequentialDeleteNoIndex 10240.015   0.012   
0.043   0.017
sequentialUpdatesIndexed10240.045   0.037   
0.943   0.054
sequentialUpdatesNoIndex10240.041   0.029   
0.838   0.053
sequentialWritesIndexed 10240.051   0.043   
2.945   0.056
sequentialWritesNoIndex 10240.037   0.030   
2.828   0.042
```

**8.1.1.1**

```
[INFO] Running org.apache.spark.util.kvstore.RocksDBBenchmark
count   meanmin max 
95th
dbClose 4   0.319   0.233   
0.514   0.514
dbCreation  4   69.826  3.767   
268.894 268.894
naturalIndexCreateIterator  10240.005   0.002   
1.409   0.007
naturalIndexDescendingCreateIterator10240.005   0.005   
0.062   0.007
naturalIndexDescendingIteration 10240.006   0.004   
0.026   0.008
naturalIndexIteration   10240.007   0.004   
0.182   0.011
randomDeleteIndexed 10240.027   0.020   
0.259   0.037
randomDeletesNoIndex10240.016   0.013   
0.041   0.019
randomUpdatesIndexed10240.087   0.033   
29.514  0.102
randomUpdatesNoIndex10240.036   0.033   
0.495   0.040
randomWritesIndexed 10240.120   0.033   
53.330  0.126
randomWritesNoIndex 10240.041   0.035   
1.403   0.047
refIndexCreateIterator  10240.006   0.005   
0.019   0.007
refIndexDescendingCreateIterator10240.003   0.003   
0.028   0.005
refIndexDescendingIteration 10240.007   0.005   
0.053   0.008
refIndexIteration   10240.008   0.006   
0.254   0.010
sequentialDeleteIndexed 10240.023   0.018   
1.237   0.027
sequentialDeleteNoIndex 10240.015   0.013   
0.043   0.018
seque

[spark] branch master updated: [SPARK-43537][INFA][BUILD] Upgrading the ASM dependencies used in the `tools` module to 9.4

2023-05-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9785353684b [SPARK-43537][INFA][BUILD] Upgrading the ASM dependencies 
used in the `tools` module to 9.4
9785353684b is described below

commit 9785353684bdc2a2c7445b7e6b9ab85154f6933f
Author: yangjie01 
AuthorDate: Wed May 17 11:18:14 2023 -0500

[SPARK-43537][INFA][BUILD] Upgrading the ASM dependencies used in the 
`tools` module to 9.4

### What changes were proposed in this pull request?
This pr aims upgrade ASM related dependencies in the `tools` module from 
version 7.1 to version 9.4 to make `GenerateMIMAIgnore` can process Java 17+ 
compiled code.

Additionally, this pr defines `asm.version` to manage versions of ASM.

### Why are the changes needed?
The classpath processed by `GenerateMIMAIgnore` cannot contain Java 17+ 
compiled code now due to the ASM version use by `tools` module is too low, but 
https://github.com/bmc/classutil has not been updated for a long time, we can't 
solve the problem by upgrading `classutil`, so this pr make the `tools` module 
explicitly rely on ASM 9.4 for workaround.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass GitHub Action
- Manual checked `dev/mima` due to this pr upgrade the dependency of tools 
module

```
dev/mima
```

and

```
dev/change-scala-version.sh 2.13
dev/mima -Pscala-2.13
```

- A case that can reproduce the problem: run following script with master 
branch:

```
set -o pipefail
set -e

FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"
export SPARK_HOME=$FWDIR
echo $SPARK_HOME

if [[ -x "$JAVA_HOME/bin/java" ]]; then
  JAVA_CMD="$JAVA_HOME/bin/java"
else
  JAVA_CMD=java
fi

TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
tools/fullClasspath" | grep jar | tail -n1)"
ASSEMBLY_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
assembly/fullClasspath" | grep jar | tail -n1)"

rm -f .generated-mima*

$JAVA_CMD \
  -Xmx2g \
  -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.util.jar=ALL-UNNAMED \
  -cp "$TOOLS_CLASSPATH:$ASSEMBLY_CLASSPATH" \
  org.apache.spark.tools.GenerateMIMAIgnore

rm -f .generated-mima*
```

**Before**

```
Exception in thread "main" java.lang.IllegalArgumentException: Unsupported 
class file major version 61
  at org.objectweb.asm.ClassReader.(ClassReader.java:195)
  at org.objectweb.asm.ClassReader.(ClassReader.java:176)
  at org.objectweb.asm.ClassReader.(ClassReader.java:162)
  at org.objectweb.asm.ClassReader.(ClassReader.java:283)
  at org.clapper.classutil.asm.ClassFile$.load(ClassFinderImpl.scala:222)
  at org.clapper.classutil.ClassFinder.classData(ClassFinder.scala:404)
  at 
org.clapper.classutil.ClassFinder.$anonfun$processOpenZip$2(ClassFinder.scala:359)
  at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
  at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
  at scala.collection.Iterator.toStream(Iterator.scala:1417)
  at scala.collection.Iterator.toStream$(Iterator.scala:1416)
  at scala.collection.AbstractIterator.toStream(Iterator.scala:1431)
  at scala.collection.Iterator.$anonfun$toStream$1(Iterator.scala:1417)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at 
scala.collection.immutable.Stream.$anonfun$$plus$plus$1(Stream.scala:372)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at 
scala.collection.immutable.Stream.$anonfun$$plus$plus$1(Stream.scala:372)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at scala.collection.immutable.Stream.filterImpl(Stream.scala:506)
  at 
scala.collection.immutable.Stream$.$anonfun$filteredTail$1(Stream.scala:1260)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163)
  at 
scala.collection.immutable.Stream$.$anonfun$filteredTail$1(Stream.scala:1260)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173)
  at scala.colle

[spark] branch master updated (8287a003512 -> 4dbcac95d5a)

2023-05-17 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 8287a003512 [MINOR][DOC] Refine doc for `Column.over`
 add 4dbcac95d5a [SPARK-43535][BUILD] Adjust the ImportOrderChecker rule to 
resolve long-standing import order issues

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/sql/application/ConnectRepl.scala   | 6 --
 .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala| 2 +-
 .../jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala  | 2 +-
 .../apache/spark/sql/connect/client/SparkConnectClientSuite.scala   | 3 ++-
 .../main/scala/org/apache/spark/sql/connect/common/UdfPacket.scala  | 3 ++-
 .../spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala | 6 --
 .../main/scala/org/apache/spark/metrics/source/JVMCPUSource.scala   | 5 +++--
 .../test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala   | 3 ++-
 .../src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala   | 3 ++-
 .../org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala | 4 ++--
 .../spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala  | 5 +++--
 .../scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala   | 3 ++-
 .../test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala| 3 ++-
 .../cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala   | 3 ++-
 .../scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala   | 3 ++-
 .../scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala | 3 ++-
 .../spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala | 3 ++-
 scalastyle-config.xml   | 2 +-
 .../org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala | 3 ++-
 19 files changed, 41 insertions(+), 24 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d44e073f0cd -> 8287a003512)

2023-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d44e073f0cd [SPARK-43128][CONNECT][SS] Make `recentProgress` and 
`lastProgress` return `StreamingQueryProgress` consistent with the native Scala 
Api
 add 8287a003512 [MINOR][DOC] Refine doc for `Column.over`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/column.py | 14 ++
 1 file changed, 10 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api

2023-05-17 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d44e073f0cd [SPARK-43128][CONNECT][SS] Make `recentProgress` and 
`lastProgress` return `StreamingQueryProgress` consistent with the native Scala 
Api
d44e073f0cd is described below

commit d44e073f0cdaf16028a4854e79db200a4e39a6fe
Author: yangjie01 
AuthorDate: Wed May 17 16:27:03 2023 +0900

[SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return 
`StreamingQueryProgress` consistent with the native Scala Api

### What changes were proposed in this pull request?
This pr  add support to make `recentProgress` and `lastProgress` in 
`RemoteStreamingQuery` return `StreamingQueryProgress` instance consistent with 
the native Scala Api.

### Why are the changes needed?
Add Spark connect jvm client api coverage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new check to `StreamingQuerySuite`

Closes #40892 from LuciferYang/SPARK-43128.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Co-authored-by: Wenchen Fan 
Co-authored-by: Ruifeng Zheng 
Co-authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
---
 .../spark/sql/streaming/StreamingQuery.scala   |   4 +-
 .../org/apache/spark/sql/streaming/progress.scala  | 310 -
 .../CheckConnectJvmClientCompatibility.scala   |   5 -
 .../streaming/StreamingQueryProgressSuite.scala| 227 +++
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  23 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |   5 +-
 .../org/apache/spark/sql/streaming/progress.scala  |  17 ++
 .../StreamingQueryStatusAndProgressSuite.scala |  58 
 8 files changed, 638 insertions(+), 11 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index b57e6239b77..ceb096b9aff 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -205,14 +205,14 @@ class RemoteStreamingQuery(
 
   override def recentProgress: Array[StreamingQueryProgress] = {
 
executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala
-  .map(json => new StreamingQueryProgress(json))
+  .map(StreamingQueryProgress.fromJson)
   .toArray
   }
 
   override def lastProgress: StreamingQueryProgress = {
 executeQueryCmd(
   
_.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption
-  .map(json => new StreamingQueryProgress(json))
+  .map(StreamingQueryProgress.fromJson)
   .orNull
   }
 
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 974bcd64b29..593311efb9c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -17,6 +17,312 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, 
safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+val operatorName: String,
+val numRowsTotal: Long,
+val numRowsUpdated: Long,
+val allUpdatesTimeMs: Long,
+val numRowsRemoved: Long,
+val allRemovalsTimeMs: Long,
+val commitTimeMs: Long,
+val memoryUsedBytes: Long,
+val numRowsDr