[spark] branch master updated: [SPARK-44792][BUILD] Upgrade curator to 5.2.0

2023-08-13 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a50170b888f [SPARK-44792][BUILD] Upgrade curator to 5.2.0
a50170b888f is described below

commit a50170b888f414aa9f41ff3ec597ff639ddc9b1f
Author: Yuming Wang 
AuthorDate: Mon Aug 14 13:42:07 2023 +0800

[SPARK-44792][BUILD] Upgrade curator to 5.2.0

### What changes were proposed in this pull request?

This PR upgrades curator to 5.2.0 to make it consistent with Hadoop 3.3.6.

Please see: https://issues.apache.org/jira/browse/HADOOP-18515

### Why are the changes needed?

To fix potential compatibility issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing test.

Closes #42474 from wangyum/SPARK-44792.

Authored-by: Yuming Wang 
Signed-off-by: Yuming Wang 
---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++---
 pom.xml   | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 3e8131b2734..d0b30a69452 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -51,9 +51,9 @@ commons-math3/3.6.1//commons-math3-3.6.1.jar
 commons-pool/1.5.4//commons-pool-1.5.4.jar
 commons-text/1.10.0//commons-text-1.10.0.jar
 compress-lzf/1.1.2//compress-lzf-1.1.2.jar
-curator-client/2.13.0//curator-client-2.13.0.jar
-curator-framework/2.13.0//curator-framework-2.13.0.jar
-curator-recipes/2.13.0//curator-recipes-2.13.0.jar
+curator-client/5.2.0//curator-client-5.2.0.jar
+curator-framework/5.2.0//curator-framework-5.2.0.jar
+curator-recipes/5.2.0//curator-recipes-5.2.0.jar
 datanucleus-api-jdo/4.2.4//datanucleus-api-jdo-4.2.4.jar
 datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
 datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
diff --git a/pom.xml b/pom.xml
index f25fb6b5f62..524913f49eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,7 +128,7 @@
 3.11.4
 ${hadoop.version}
 3.6.3
-2.13.0
+5.2.0
 org.apache.hive
 core
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 03/03: Revert "Add test"

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 481546385a605143d3a6103fa2aaf9936612465d
Author: Herman van Hovell 
AuthorDate: Mon Aug 14 05:39:15 2023 +0200

Revert "Add test"

This reverts commit 2c465f681ada8be8cb53edfff3ddbd273b89bc72.
---
 .../scala/org/apache/spark/sql/application/ReplE2ESuite.scala| 9 -
 1 file changed, 9 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
index 0cab66eef3d..0e69b5afa45 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
@@ -276,13 +276,4 @@ class ReplE2ESuite extends RemoteSparkSession with 
BeforeAndAfterEach {
 val output = runCommandsInShell(input)
 assertContains("Array[MyTestClass] = Array(MyTestClass(1), 
MyTestClass(3))", output)
   }
-
-  test("REPL class in UDF") {
-val input = """
-|case class MyTestClass(value: Int)
-|spark.range(2).map(i => MyTestClass(i.toInt)).collect()
-  """.stripMargin
-val output = runCommandsInShell(input)
-assertContains("Array[MyTestClass] = Array(MyTestClass(0), 
MyTestClass(1))", output)
-  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 02/03: Revert "Add classloader Id to code generation cache."

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 3c1ea33a3bfd5c355fff635499c48905b24ca3e8
Author: Herman van Hovell 
AuthorDate: Mon Aug 14 05:39:06 2023 +0200

Revert "Add classloader Id to code generation cache."

This reverts commit 6d4891b32cee585523a51a5304d6aa3c47bb8af8.
---
 .../expressions/codegen/CodeGenerator.scala| 40 --
 1 file changed, 14 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 59688cae889..8d10f6cd295 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.catalyst.expressions.codegen
 
 import java.io.ByteArrayInputStream
-import java.util.UUID
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -26,7 +25,6 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
-import com.google.common.cache.{CacheBuilder, CacheLoader}
 import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
 import org.codehaus.commons.compiler.{CompileException, 
InternalCompilerException}
 import org.codehaus.janino.ClassBodyEvaluator
@@ -1441,7 +1439,7 @@ object CodeGenerator extends Logging {
* @return a pair of a generated class and the bytecode statistics of 
generated functions.
*/
   def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try {
-cache.get((classLoaderUUID.get(Utils.getContextOrSparkClassLoader), code))
+cache.get(code)
   } catch {
 // Cache.get() may wrap the original exception. See the following URL
 // https://guava.dev/releases/14.0.1/api/docs/com/google/common/cache/
@@ -1583,30 +1581,20 @@ object CodeGenerator extends Logging {
* aborted. See [[NonFateSharingCache]] for more details.
*/
   private val cache = {
-val loadFunc: ((ClassLoaderId, CodeAndComment)) => (GeneratedClass, 
ByteCodeStats) = {
-  case (_, code) =>
-val startTime = System.nanoTime()
-val result = doCompile(code)
-val endTime = System.nanoTime()
-val duration = endTime - startTime
-val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS
-CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length)
-CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong)
-logInfo(s"Code generated in $timeMs ms")
-_compileTime.add(duration)
-result
+def loadFunc: CodeAndComment => (GeneratedClass, ByteCodeStats) = code => {
+  val startTime = System.nanoTime()
+  val result = doCompile(code)
+  val endTime = System.nanoTime()
+  val duration = endTime - startTime
+  val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS
+  CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length)
+  CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong)
+  logInfo(s"Code generated in $timeMs ms")
+  _compileTime.add(duration)
+  result
 }
-NonFateSharingCache(loadFunc, SQLConf.get.codegenCacheMaxEntries)
-  }
-
-  type ClassLoaderId = String
-  private val classLoaderUUID = {
-NonFateSharingCache(CacheBuilder.newBuilder()
-  .weakKeys
-  .maximumSize(SQLConf.get.codegenCacheMaxEntries)
-  .build(new CacheLoader[ClassLoader, ClassLoaderId]() {
-override def load(code: ClassLoader): ClassLoaderId = 
UUID.randomUUID.toString
-  }))
+NonFateSharingCache[CodeAndComment, (GeneratedClass, ByteCodeStats)](
+  loadFunc, SQLConf.get.codegenCacheMaxEntries)
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (890748873bd -> 481546385a6)

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 890748873bd Refine solution
 new 8a0e0591c83 Revert "Refine solution"
 new 3c1ea33a3bf Revert "Add classloader Id to code generation cache."
 new 481546385a6 Revert "Add test"

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../spark/sql/application/ReplE2ESuite.scala   |  9 
 .../spark/sql/catalyst/encoders/OuterScopes.scala  | 49 +-
 .../expressions/codegen/CodeGenerator.scala| 30 +++--
 3 files changed, 34 insertions(+), 54 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/03: Revert "Refine solution"

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 8a0e0591c83f450b75b0c066ba50fc2d3f20b290
Author: Herman van Hovell 
AuthorDate: Mon Aug 14 05:38:54 2023 +0200

Revert "Refine solution"

This reverts commit 890748873bd8bd72b34d3f907ecdb72a694234c9.
---
 .../spark/sql/catalyst/encoders/OuterScopes.scala  | 49 +-
 .../expressions/codegen/CodeGenerator.scala| 18 ++--
 2 files changed, 34 insertions(+), 33 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
index 6c10e8ece80..c2ac504c846 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
@@ -26,9 +26,28 @@ import org.apache.spark.util.SparkClassUtils
 
 object OuterScopes {
   private[this] val queue = new ReferenceQueue[AnyRef]
+  private class HashableWeakReference(v: AnyRef) extends 
WeakReference[AnyRef](v, queue) {
+private[this] val hash = v.hashCode()
+override def hashCode(): Int = hash
+override def equals(obj: Any): Boolean = {
+  obj match {
+case other: HashableWeakReference =>
+  // Note that referential equality is used to identify & purge
+  // references from the map whose' referent went out of scope.
+  if (this eq other) {
+true
+  } else {
+val referent = get()
+val otherReferent = other.get()
+referent != null && otherReferent != null && 
Objects.equals(referent, otherReferent)
+  }
+case _ => false
+  }
+}
+  }
 
   private def classLoaderRef(c: Class[_]): HashableWeakReference = {
-new HashableWeakReference(c.getClassLoader, queue)
+new HashableWeakReference(c.getClassLoader)
   }
 
   private[this] val outerScopes = {
@@ -135,31 +154,3 @@ object OuterScopes {
   // e.g. `ammonite.$sess.cmd8$Helper$Foo` -> 
`ammonite.$sess.cmd8.instance.Foo`
   private[this] val AmmoniteREPLClass = 
"""^(ammonite\.\$sess\.cmd(?:\d+)\$).*""".r
 }
-
-/**
- * A [[WeakReference]] that has a stable hash-key. When the referent is still 
alive we will use
- * the referent for equality, once it is dead it we will fallback to 
referential equality. This
- * way you can still do lookups in a map when the referent is alive, and are 
capable of removing
- * dead entries after GC (using a [[ReferenceQueue]]).
- */
-private[catalyst] class HashableWeakReference(v: AnyRef, queue: 
ReferenceQueue[AnyRef])
-  extends WeakReference[AnyRef](v, queue) {
-  def this(v: AnyRef) = this(v, null)
-  private[this] val hash = v.hashCode()
-  override def hashCode(): Int = hash
-  override def equals(obj: Any): Boolean = {
-obj match {
-  case other: HashableWeakReference =>
-// Note that referential equality is used to identify & purge
-// references from the map whose' referent went out of scope.
-if (this eq other) {
-  true
-} else {
-  val referent = get()
-  val otherReferent = other.get()
-  referent != null && otherReferent != null && 
Objects.equals(referent, otherReferent)
-}
-  case _ => false
-}
-  }
-}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index fe61cc81359..59688cae889 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions.codegen
 
 import java.io.ByteArrayInputStream
+import java.util.UUID
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -25,6 +26,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
+import com.google.common.cache.{CacheBuilder, CacheLoader}
 import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
 import org.codehaus.commons.compiler.{CompileException, 
InternalCompilerException}
 import org.codehaus.janino.ClassBodyEvaluator
@@ -35,7 +37,6 @@ import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.CodegenMetrics
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.HashableWeakReference
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.types._
@@ 

[spark] 01/03: Add test

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 2c465f681ada8be8cb53edfff3ddbd273b89bc72
Author: Herman van Hovell 
AuthorDate: Mon Aug 14 04:59:59 2023 +0200

Add test
---
 .../scala/org/apache/spark/sql/application/ReplE2ESuite.scala| 9 +
 1 file changed, 9 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
index 0e69b5afa45..0cab66eef3d 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
@@ -276,4 +276,13 @@ class ReplE2ESuite extends RemoteSparkSession with 
BeforeAndAfterEach {
 val output = runCommandsInShell(input)
 assertContains("Array[MyTestClass] = Array(MyTestClass(1), 
MyTestClass(3))", output)
   }
+
+  test("REPL class in UDF") {
+val input = """
+|case class MyTestClass(value: Int)
+|spark.range(2).map(i => MyTestClass(i.toInt)).collect()
+  """.stripMargin
+val output = runCommandsInShell(input)
+assertContains("Array[MyTestClass] = Array(MyTestClass(0), 
MyTestClass(1))", output)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6bd95d0e004 -> 890748873bd)

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6bd95d0e004 [SPARK-44775][PYTHON][DOCS] Add missing version 
information in DataFrame APIs
 new 2c465f681ad Add test
 new 6d4891b32ce Add classloader Id to code generation cache.
 new 890748873bd Refine solution

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../spark/sql/application/ReplE2ESuite.scala   |  9 
 .../spark/sql/catalyst/encoders/OuterScopes.scala  | 49 +-
 .../expressions/codegen/CodeGenerator.scala| 30 ++---
 3 files changed, 54 insertions(+), 34 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 02/03: Add classloader Id to code generation cache.

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 6d4891b32cee585523a51a5304d6aa3c47bb8af8
Author: Herman van Hovell 
AuthorDate: Sun Aug 13 02:49:19 2023 +0200

Add classloader Id to code generation cache.
---
 .../expressions/codegen/CodeGenerator.scala| 40 ++
 1 file changed, 26 insertions(+), 14 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 8d10f6cd295..59688cae889 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions.codegen
 
 import java.io.ByteArrayInputStream
+import java.util.UUID
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -25,6 +26,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
+import com.google.common.cache.{CacheBuilder, CacheLoader}
 import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
 import org.codehaus.commons.compiler.{CompileException, 
InternalCompilerException}
 import org.codehaus.janino.ClassBodyEvaluator
@@ -1439,7 +1441,7 @@ object CodeGenerator extends Logging {
* @return a pair of a generated class and the bytecode statistics of 
generated functions.
*/
   def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try {
-cache.get(code)
+cache.get((classLoaderUUID.get(Utils.getContextOrSparkClassLoader), code))
   } catch {
 // Cache.get() may wrap the original exception. See the following URL
 // https://guava.dev/releases/14.0.1/api/docs/com/google/common/cache/
@@ -1581,20 +1583,30 @@ object CodeGenerator extends Logging {
* aborted. See [[NonFateSharingCache]] for more details.
*/
   private val cache = {
-def loadFunc: CodeAndComment => (GeneratedClass, ByteCodeStats) = code => {
-  val startTime = System.nanoTime()
-  val result = doCompile(code)
-  val endTime = System.nanoTime()
-  val duration = endTime - startTime
-  val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS
-  CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length)
-  CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong)
-  logInfo(s"Code generated in $timeMs ms")
-  _compileTime.add(duration)
-  result
+val loadFunc: ((ClassLoaderId, CodeAndComment)) => (GeneratedClass, 
ByteCodeStats) = {
+  case (_, code) =>
+val startTime = System.nanoTime()
+val result = doCompile(code)
+val endTime = System.nanoTime()
+val duration = endTime - startTime
+val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS
+CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length)
+CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong)
+logInfo(s"Code generated in $timeMs ms")
+_compileTime.add(duration)
+result
 }
-NonFateSharingCache[CodeAndComment, (GeneratedClass, ByteCodeStats)](
-  loadFunc, SQLConf.get.codegenCacheMaxEntries)
+NonFateSharingCache(loadFunc, SQLConf.get.codegenCacheMaxEntries)
+  }
+
+  type ClassLoaderId = String
+  private val classLoaderUUID = {
+NonFateSharingCache(CacheBuilder.newBuilder()
+  .weakKeys
+  .maximumSize(SQLConf.get.codegenCacheMaxEntries)
+  .build(new CacheLoader[ClassLoader, ClassLoaderId]() {
+override def load(code: ClassLoader): ClassLoaderId = 
UUID.randomUUID.toString
+  }))
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 03/03: Refine solution

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 890748873bd8bd72b34d3f907ecdb72a694234c9
Author: Herman van Hovell 
AuthorDate: Mon Aug 14 05:32:57 2023 +0200

Refine solution
---
 .../spark/sql/catalyst/encoders/OuterScopes.scala  | 49 +-
 .../expressions/codegen/CodeGenerator.scala| 18 ++--
 2 files changed, 33 insertions(+), 34 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
index c2ac504c846..6c10e8ece80 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala
@@ -26,28 +26,9 @@ import org.apache.spark.util.SparkClassUtils
 
 object OuterScopes {
   private[this] val queue = new ReferenceQueue[AnyRef]
-  private class HashableWeakReference(v: AnyRef) extends 
WeakReference[AnyRef](v, queue) {
-private[this] val hash = v.hashCode()
-override def hashCode(): Int = hash
-override def equals(obj: Any): Boolean = {
-  obj match {
-case other: HashableWeakReference =>
-  // Note that referential equality is used to identify & purge
-  // references from the map whose' referent went out of scope.
-  if (this eq other) {
-true
-  } else {
-val referent = get()
-val otherReferent = other.get()
-referent != null && otherReferent != null && 
Objects.equals(referent, otherReferent)
-  }
-case _ => false
-  }
-}
-  }
 
   private def classLoaderRef(c: Class[_]): HashableWeakReference = {
-new HashableWeakReference(c.getClassLoader)
+new HashableWeakReference(c.getClassLoader, queue)
   }
 
   private[this] val outerScopes = {
@@ -154,3 +135,31 @@ object OuterScopes {
   // e.g. `ammonite.$sess.cmd8$Helper$Foo` -> 
`ammonite.$sess.cmd8.instance.Foo`
   private[this] val AmmoniteREPLClass = 
"""^(ammonite\.\$sess\.cmd(?:\d+)\$).*""".r
 }
+
+/**
+ * A [[WeakReference]] that has a stable hash-key. When the referent is still 
alive we will use
+ * the referent for equality, once it is dead it we will fallback to 
referential equality. This
+ * way you can still do lookups in a map when the referent is alive, and are 
capable of removing
+ * dead entries after GC (using a [[ReferenceQueue]]).
+ */
+private[catalyst] class HashableWeakReference(v: AnyRef, queue: 
ReferenceQueue[AnyRef])
+  extends WeakReference[AnyRef](v, queue) {
+  def this(v: AnyRef) = this(v, null)
+  private[this] val hash = v.hashCode()
+  override def hashCode(): Int = hash
+  override def equals(obj: Any): Boolean = {
+obj match {
+  case other: HashableWeakReference =>
+// Note that referential equality is used to identify & purge
+// references from the map whose' referent went out of scope.
+if (this eq other) {
+  true
+} else {
+  val referent = get()
+  val otherReferent = other.get()
+  referent != null && otherReferent != null && 
Objects.equals(referent, otherReferent)
+}
+  case _ => false
+}
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 59688cae889..fe61cc81359 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.catalyst.expressions.codegen
 
 import java.io.ByteArrayInputStream
-import java.util.UUID
 
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
@@ -26,7 +25,6 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
-import com.google.common.cache.{CacheBuilder, CacheLoader}
 import com.google.common.util.concurrent.{ExecutionError, 
UncheckedExecutionException}
 import org.codehaus.commons.compiler.{CompileException, 
InternalCompilerException}
 import org.codehaus.janino.ClassBodyEvaluator
@@ -37,6 +35,7 @@ import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.CodegenMetrics
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.HashableWeakReference
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.types._
@@ -1441,7 +1440,8 @@ object CodeGenerator extends Logging {
* @return a pair 

[spark] branch branch-3.5 updated: [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs

2023-08-13 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 0742644d21b [SPARK-44775][PYTHON][DOCS] Add missing version 
information in DataFrame APIs
0742644d21b is described below

commit 0742644d21b816c8c94ebaf5c789e2ec4e30b099
Author: Ruifeng Zheng 
AuthorDate: Mon Aug 14 10:40:15 2023 +0800

[SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame 
APIs

### What changes were proposed in this pull request?
Add missing version information in DataFrame APIs

### Why are the changes needed?
to improve docs

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
CI

Closes #42451 from zhengruifeng/doc_df_api_versions.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
(cherry picked from commit 6bd95d0e004505840aa0749107aa76f3a17958be)
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py | 16 ++--
 python/pyspark/sql/dataframe.py | 30 +-
 2 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 14d9c2c9d05..7b326538a8e 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -2023,22 +2023,10 @@ class DataFrame:
 
 # SparkConnect specific API
 def offset(self, n: int) -> "DataFrame":
-"""Returns a new :class: `DataFrame` by skipping the first `n` rows.
-
-.. versionadded:: 3.4.0
-
-Parameters
---
-num : int
-Number of records to skip.
-
-Returns
----
-:class:`DataFrame`
-Subset of the records
-"""
 return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n), 
session=self._session)
 
+offset.__doc__ = PySparkDataFrame.offset.__doc__
+
 @classmethod
 def withPlan(cls, plan: plan.LogicalPlan, session: "SparkSession") -> 
"DataFrame":
 """
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index f6fe17539c6..8be2c224265 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -516,6 +516,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 2.0.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Notes
 -
 This API is evolving.
@@ -1304,7 +1307,10 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 def offset(self, num: int) -> "DataFrame":
 """Returns a new :class: `DataFrame` by skipping the first `n` rows.
 
-.. versionadded:: 3.5.0
+.. versionadded:: 3.4.0
+
+.. versionchanged:: 3.5.0
+Supports vanilla PySpark.
 
 Parameters
 --
@@ -3540,6 +3546,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.4.0
 
+.. versionchanged:: 3.4.0
+Supports Spark Connect.
+
 Parameters
 --
 ids : str, Column, tuple, list, optional
@@ -3631,6 +3640,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.3.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Parameters
 --
 observation : :class:`Observation` or str
@@ -4066,6 +4078,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
  .. versionadded:: 3.5.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
  Parameters
  --
  subset : List of column names, optional
@@ -5276,6 +5291,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 def toDF(self, *cols: str) -> "DataFrame":
 """Returns a new :class:`DataFrame` that with new specified column 
names
 
+.. versionadded:: 1.6.0
+
 .. versionchanged:: 3.4.0
 Supports Spark Connect.
 
@@ -5381,6 +5398,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.1.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Notes
 -
 The equality comparison here is simplified by tolerating the cosmetic 
differences
@@ -5426,6 +5446,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.1.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Notes
 -
 Unlike the standard hash code, the hash is calculated against the 
query plan
@@ -5549,6 +5572,11 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 """

[spark] branch master updated: [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs

2023-08-13 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6bd95d0e004 [SPARK-44775][PYTHON][DOCS] Add missing version 
information in DataFrame APIs
6bd95d0e004 is described below

commit 6bd95d0e004505840aa0749107aa76f3a17958be
Author: Ruifeng Zheng 
AuthorDate: Mon Aug 14 10:40:15 2023 +0800

[SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame 
APIs

### What changes were proposed in this pull request?
Add missing version information in DataFrame APIs

### Why are the changes needed?
to improve docs

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
CI

Closes #42451 from zhengruifeng/doc_df_api_versions.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/dataframe.py | 16 ++--
 python/pyspark/sql/dataframe.py | 30 +-
 2 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 14d9c2c9d05..7b326538a8e 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -2023,22 +2023,10 @@ class DataFrame:
 
 # SparkConnect specific API
 def offset(self, n: int) -> "DataFrame":
-"""Returns a new :class: `DataFrame` by skipping the first `n` rows.
-
-.. versionadded:: 3.4.0
-
-Parameters
---
-num : int
-Number of records to skip.
-
-Returns
----
-:class:`DataFrame`
-Subset of the records
-"""
 return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n), 
session=self._session)
 
+offset.__doc__ = PySparkDataFrame.offset.__doc__
+
 @classmethod
 def withPlan(cls, plan: plan.LogicalPlan, session: "SparkSession") -> 
"DataFrame":
 """
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index f6fe17539c6..8be2c224265 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -516,6 +516,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 2.0.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Notes
 -
 This API is evolving.
@@ -1304,7 +1307,10 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 def offset(self, num: int) -> "DataFrame":
 """Returns a new :class: `DataFrame` by skipping the first `n` rows.
 
-.. versionadded:: 3.5.0
+.. versionadded:: 3.4.0
+
+.. versionchanged:: 3.5.0
+Supports vanilla PySpark.
 
 Parameters
 --
@@ -3540,6 +3546,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.4.0
 
+.. versionchanged:: 3.4.0
+Supports Spark Connect.
+
 Parameters
 --
 ids : str, Column, tuple, list, optional
@@ -3631,6 +3640,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.3.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Parameters
 --
 observation : :class:`Observation` or str
@@ -4066,6 +4078,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
  .. versionadded:: 3.5.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
  Parameters
  --
  subset : List of column names, optional
@@ -5276,6 +5291,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 def toDF(self, *cols: str) -> "DataFrame":
 """Returns a new :class:`DataFrame` that with new specified column 
names
 
+.. versionadded:: 1.6.0
+
 .. versionchanged:: 3.4.0
 Supports Spark Connect.
 
@@ -5381,6 +5398,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.1.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Notes
 -
 The equality comparison here is simplified by tolerating the cosmetic 
differences
@@ -5426,6 +5446,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
 .. versionadded:: 3.1.0
 
+.. versionchanged:: 3.5.0
+Supports Spark Connect.
+
 Notes
 -
 Unlike the standard hash code, the hash is calculated against the 
query plan
@@ -5549,6 +5572,11 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 """
 Converts the existing DataFrame into a pandas-on-Spark DataFrame.
 
+.. versionadded:: 3.2.0
+
+  

[spark] branch branch-3.5 updated: [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new f0bb1391fe4 [SPARK-44791][CONNECT] Make ArrowDeserializer work with 
REPL generated classes
f0bb1391fe4 is described below

commit f0bb1391fe460fee886bce9151a47e89e75de671
Author: Herman van Hovell 
AuthorDate: Mon Aug 14 02:38:54 2023 +0200

[SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated 
classes

### What changes were proposed in this pull request?
Connects arrow deserialization currently does not work with REPL generated 
classes. For example the following code would fail:
```scala
case class MyTestClass(value: Int) {
  override def toString: String = value.toString
}
spark.range(10).map(i => MyTestClass(i.toInt)).collect()
```

The problem is that for instantiation of the `MyTestClass` class we need 
the instance of the class that it was defined in (its outerscope). In Spark we 
have a mechanism called `OuterScopes` to register these instances in. The 
`ArrowDeserializer` was not resolving this outer instance. This PR fixes this.

We have a similar issue on the executor/driver side. This will be fixed in 
a different PR.

### Why are the changes needed?
It is a bug.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
I have added tests to `ReplE2Esuite` and to the `ArrowEncoderSuite`.

Closes #42473 from hvanhovell/SPARK-44791.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
(cherry picked from commit dcf3d582293c3dbb3820d12fa15b41e8bd5fe6ad)
Signed-off-by: Herman van Hovell 
---
 .../org/apache/spark/util/SparkClassUtils.scala| 28 +++
 .../connect/client/arrow/ArrowDeserializer.scala   | 14 +++-
 .../spark/sql/application/ReplE2ESuite.scala   | 33 -
 .../connect/client/arrow/ArrowEncoderSuite.scala   | 12 ++-
 .../main/scala/org/apache/spark/util/Utils.scala   | 28 ---
 .../spark/sql/catalyst/encoders/OuterScopes.scala  | 85 +-
 .../apache/spark/sql/errors/ExecutionErrors.scala  |  7 ++
 .../spark/sql/errors/QueryExecutionErrors.scala|  7 --
 8 files changed, 138 insertions(+), 76 deletions(-)

diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
index a237869aef3..679d546d04c 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
@@ -50,6 +50,34 @@ trait SparkClassUtils {
   def classIsLoadable(clazz: String): Boolean = {
 Try { classForName(clazz, initialize = false) }.isSuccess
   }
+
+  /**
+   * Returns true if and only if the underlying class is a member class.
+   *
+   * Note: jdk8u throws a "Malformed class name" error if a given class is a 
deeply-nested
+   * inner class (See SPARK-34607 for details). This issue has already been 
fixed in jdk9+, so
+   * we can remove this helper method safely if we drop the support of jdk8u.
+   */
+  def isMemberClass(cls: Class[_]): Boolean = {
+try {
+  cls.isMemberClass
+} catch {
+  case _: InternalError =>
+// We emulate jdk8u `Class.isMemberClass` below:
+//   public boolean isMemberClass() {
+// return getSimpleBinaryName() != null && 
!isLocalOrAnonymousClass();
+//   }
+// `getSimpleBinaryName()` returns null if a given class is a 
top-level class,
+// so we replace it with `cls.getEnclosingClass != null`. The second 
condition checks
+// if a given class is not a local or an anonymous class, so we 
replace it with
+// `cls.getEnclosingMethod == null` because `cls.getEnclosingMethod()` 
return a value
+// only in either case (JVM Spec 4.8.6).
+//
+// Note: The newer jdk evaluates `!isLocalOrAnonymousClass()` first,
+// we reorder the conditions to follow it.
+cls.getEnclosingMethod == null && cls.getEnclosingClass != null
+}
+  }
 }
 
 object SparkClassUtils extends SparkClassUtils
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
index 55dd640f1b6..82086b9d47a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
@@ -34,7 +34,7 @@ import org.apache.arrow.vector.ipc.ArrowReader
 import org.apache.arrow.vector.util.Text
 
 import 

[spark] branch master updated: [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dcf3d582293 [SPARK-44791][CONNECT] Make ArrowDeserializer work with 
REPL generated classes
dcf3d582293 is described below

commit dcf3d582293c3dbb3820d12fa15b41e8bd5fe6ad
Author: Herman van Hovell 
AuthorDate: Mon Aug 14 02:38:54 2023 +0200

[SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated 
classes

### What changes were proposed in this pull request?
Connects arrow deserialization currently does not work with REPL generated 
classes. For example the following code would fail:
```scala
case class MyTestClass(value: Int) {
  override def toString: String = value.toString
}
spark.range(10).map(i => MyTestClass(i.toInt)).collect()
```

The problem is that for instantiation of the `MyTestClass` class we need 
the instance of the class that it was defined in (its outerscope). In Spark we 
have a mechanism called `OuterScopes` to register these instances in. The 
`ArrowDeserializer` was not resolving this outer instance. This PR fixes this.

We have a similar issue on the executor/driver side. This will be fixed in 
a different PR.

### Why are the changes needed?
It is a bug.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
I have added tests to `ReplE2Esuite` and to the `ArrowEncoderSuite`.

Closes #42473 from hvanhovell/SPARK-44791.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
---
 .../org/apache/spark/util/SparkClassUtils.scala| 28 +++
 .../connect/client/arrow/ArrowDeserializer.scala   | 14 +++-
 .../spark/sql/application/ReplE2ESuite.scala   | 33 -
 .../connect/client/arrow/ArrowEncoderSuite.scala   | 12 ++-
 .../main/scala/org/apache/spark/util/Utils.scala   | 28 ---
 .../spark/sql/catalyst/encoders/OuterScopes.scala  | 85 +-
 .../apache/spark/sql/errors/ExecutionErrors.scala  |  7 ++
 .../spark/sql/errors/QueryExecutionErrors.scala|  7 --
 8 files changed, 138 insertions(+), 76 deletions(-)

diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
index a237869aef3..679d546d04c 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala
@@ -50,6 +50,34 @@ trait SparkClassUtils {
   def classIsLoadable(clazz: String): Boolean = {
 Try { classForName(clazz, initialize = false) }.isSuccess
   }
+
+  /**
+   * Returns true if and only if the underlying class is a member class.
+   *
+   * Note: jdk8u throws a "Malformed class name" error if a given class is a 
deeply-nested
+   * inner class (See SPARK-34607 for details). This issue has already been 
fixed in jdk9+, so
+   * we can remove this helper method safely if we drop the support of jdk8u.
+   */
+  def isMemberClass(cls: Class[_]): Boolean = {
+try {
+  cls.isMemberClass
+} catch {
+  case _: InternalError =>
+// We emulate jdk8u `Class.isMemberClass` below:
+//   public boolean isMemberClass() {
+// return getSimpleBinaryName() != null && 
!isLocalOrAnonymousClass();
+//   }
+// `getSimpleBinaryName()` returns null if a given class is a 
top-level class,
+// so we replace it with `cls.getEnclosingClass != null`. The second 
condition checks
+// if a given class is not a local or an anonymous class, so we 
replace it with
+// `cls.getEnclosingMethod == null` because `cls.getEnclosingMethod()` 
return a value
+// only in either case (JVM Spec 4.8.6).
+//
+// Note: The newer jdk evaluates `!isLocalOrAnonymousClass()` first,
+// we reorder the conditions to follow it.
+cls.getEnclosingMethod == null && cls.getEnclosingClass != null
+}
+  }
 }
 
 object SparkClassUtils extends SparkClassUtils
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
index 55dd640f1b6..82086b9d47a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala
@@ -34,7 +34,7 @@ import org.apache.arrow.vector.ipc.ArrowReader
 import org.apache.arrow.vector.util.Text
 
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
+import 

[spark] branch branch-3.5 updated: [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new b0b15475a0a [SPARK-44736][CONNECT] Add Dataset.explode to Spark 
Connect Scala Client
b0b15475a0a is described below

commit b0b15475a0ac2d73b829491532747a249498c1a6
Author: Herman van Hovell 
AuthorDate: Sun Aug 13 20:27:08 2023 +0200

[SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client

### What changes were proposed in this pull request?
This PR adds Dataset.explode to the Spark Connect Scala Client.

### Why are the changes needed?
To increase compatibility with the existing Dataset API in sql/core.

### Does this PR introduce _any_ user-facing change?
Yes, it adds a new method to the scala client.

### How was this patch tested?
I added a test to `UserDefinedFunctionE2ETestSuite`.

Closes #42418 from hvanhovell/SPARK-44736.

Lead-authored-by: Herman van Hovell 
Co-authored-by: itholic 
Co-authored-by: Juliusz Sompolski 
Co-authored-by: Martin Grund 
Co-authored-by: Hyukjin Kwon 
Co-authored-by: Kent Yao 
Co-authored-by: Wenchen Fan 
Co-authored-by: Wei Liu 
Co-authored-by: Ruifeng Zheng 
Co-authored-by: Gengliang Wang 
Co-authored-by: Yuming Wang 
Co-authored-by: Herman van Hovell 
Co-authored-by: 余良 
Co-authored-by: Dongjoon Hyun 
Co-authored-by: Jack Chen 
Co-authored-by: srielau 
Co-authored-by: zhyhimont 
Co-authored-by: Daniel Tenedorio 
Co-authored-by: Dongjoon Hyun 
Co-authored-by: Zhyhimont Dmitry 
Co-authored-by: Sandip Agarwala 
<131817656+sandip...@users.noreply.github.com>
Co-authored-by: yangjie01 
Co-authored-by: Yihong He 
Co-authored-by: Rameshkrishnan Muthusamy 

Co-authored-by: Jia Fan 
Co-authored-by: allisonwang-db 
Co-authored-by: Utkarsh 
Co-authored-by: Cheng Pan 
Co-authored-by: Jason Li 
Co-authored-by: Shu Wang 
Co-authored-by: Nicolas Fraison 
Co-authored-by: Max Gekk 
Co-authored-by: panbingkun 
Co-authored-by: Ziqi Liu 
Signed-off-by: Herman van Hovell 
(cherry picked from commit f496cd1ee2a7e59af08e1bd3ab0579f93cc46da9)
Signed-off-by: Herman van Hovell 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 70 ++
 .../sql/UserDefinedFunctionE2ETestSuite.scala  | 60 +++
 .../CheckConnectJvmClientCompatibility.scala   |  1 -
 .../apache/spark/sql/connect/common/UdfUtils.scala |  4 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  3 +-
 5 files changed, 136 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 2d72ea6bda8..28b04fb850e 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -21,12 +21,14 @@ import java.util.{Collections, Locale}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.java.function._
 import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
 import org.apache.spark.sql.catalyst.expressions.OrderUtils
@@ -2728,6 +2730,74 @@ class Dataset[T] private[sql] (
 flatMap(UdfUtils.flatMapFuncToScalaFunc(f))(encoder)
   }
 
+  /**
+   * (Scala-specific) Returns a new Dataset where each row has been expanded 
to zero or more rows
+   * by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. 
The columns of the
+   * input row are implicitly joined with each row that is output by the 
function.
+   *
+   * Given that this is deprecated, as an alternative, you can explode columns 
either using
+   * `functions.explode()` or `flatMap()`. The following example uses these 
alternatives to count
+   * the number of books that contain a given word:
+   *
+   * {{{
+   *   case class Book(title: String, words: String)
+   *   val ds: Dataset[Book]
+   *
+   *   val allWords = ds.select($"title", explode(split($"words", " 
")).as("word"))
+   *
+   *   val bookCountPerWord = 
allWords.groupBy("word").agg(count_distinct("title"))
+   * }}}
+   *
+   * Using `flatMap()` this can similarly be exploded as:
+   *
+   * {{{
+   *   ds.flatMap(_.words.split(" "))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 3.5.0

[spark] branch master updated: [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client

2023-08-13 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f496cd1ee2a [SPARK-44736][CONNECT] Add Dataset.explode to Spark 
Connect Scala Client
f496cd1ee2a is described below

commit f496cd1ee2a7e59af08e1bd3ab0579f93cc46da9
Author: Herman van Hovell 
AuthorDate: Sun Aug 13 20:27:08 2023 +0200

[SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client

### What changes were proposed in this pull request?
This PR adds Dataset.explode to the Spark Connect Scala Client.

### Why are the changes needed?
To increase compatibility with the existing Dataset API in sql/core.

### Does this PR introduce _any_ user-facing change?
Yes, it adds a new method to the scala client.

### How was this patch tested?
I added a test to `UserDefinedFunctionE2ETestSuite`.

Closes #42418 from hvanhovell/SPARK-44736.

Lead-authored-by: Herman van Hovell 
Co-authored-by: itholic 
Co-authored-by: Juliusz Sompolski 
Co-authored-by: Martin Grund 
Co-authored-by: Hyukjin Kwon 
Co-authored-by: Kent Yao 
Co-authored-by: Wenchen Fan 
Co-authored-by: Wei Liu 
Co-authored-by: Ruifeng Zheng 
Co-authored-by: Gengliang Wang 
Co-authored-by: Yuming Wang 
Co-authored-by: Herman van Hovell 
Co-authored-by: 余良 
Co-authored-by: Dongjoon Hyun 
Co-authored-by: Jack Chen 
Co-authored-by: srielau 
Co-authored-by: zhyhimont 
Co-authored-by: Daniel Tenedorio 
Co-authored-by: Dongjoon Hyun 
Co-authored-by: Zhyhimont Dmitry 
Co-authored-by: Sandip Agarwala 
<131817656+sandip...@users.noreply.github.com>
Co-authored-by: yangjie01 
Co-authored-by: Yihong He 
Co-authored-by: Rameshkrishnan Muthusamy 

Co-authored-by: Jia Fan 
Co-authored-by: allisonwang-db 
Co-authored-by: Utkarsh 
Co-authored-by: Cheng Pan 
Co-authored-by: Jason Li 
Co-authored-by: Shu Wang 
Co-authored-by: Nicolas Fraison 
Co-authored-by: Max Gekk 
Co-authored-by: panbingkun 
Co-authored-by: Ziqi Liu 
Signed-off-by: Herman van Hovell 
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 70 ++
 .../sql/UserDefinedFunctionE2ETestSuite.scala  | 60 +++
 .../CheckConnectJvmClientCompatibility.scala   |  1 -
 .../apache/spark/sql/connect/common/UdfUtils.scala |  4 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  3 +-
 5 files changed, 136 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 2d72ea6bda8..28b04fb850e 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -21,12 +21,14 @@ import java.util.{Collections, Locale}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.java.function._
 import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._
 import org.apache.spark.sql.catalyst.expressions.OrderUtils
@@ -2728,6 +2730,74 @@ class Dataset[T] private[sql] (
 flatMap(UdfUtils.flatMapFuncToScalaFunc(f))(encoder)
   }
 
+  /**
+   * (Scala-specific) Returns a new Dataset where each row has been expanded 
to zero or more rows
+   * by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. 
The columns of the
+   * input row are implicitly joined with each row that is output by the 
function.
+   *
+   * Given that this is deprecated, as an alternative, you can explode columns 
either using
+   * `functions.explode()` or `flatMap()`. The following example uses these 
alternatives to count
+   * the number of books that contain a given word:
+   *
+   * {{{
+   *   case class Book(title: String, words: String)
+   *   val ds: Dataset[Book]
+   *
+   *   val allWords = ds.select($"title", explode(split($"words", " 
")).as("word"))
+   *
+   *   val bookCountPerWord = 
allWords.groupBy("word").agg(count_distinct("title"))
+   * }}}
+   *
+   * Using `flatMap()` this can similarly be exploded as:
+   *
+   * {{{
+   *   ds.flatMap(_.words.split(" "))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 3.5.0
+   */
+  @deprecated("use flatMap() or select() with functions.explode() instead", 
"3.5.0")
+  def explode[A <: