[spark] branch master updated: [SPARK-44792][BUILD] Upgrade curator to 5.2.0
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a50170b888f [SPARK-44792][BUILD] Upgrade curator to 5.2.0 a50170b888f is described below commit a50170b888f414aa9f41ff3ec597ff639ddc9b1f Author: Yuming Wang AuthorDate: Mon Aug 14 13:42:07 2023 +0800 [SPARK-44792][BUILD] Upgrade curator to 5.2.0 ### What changes were proposed in this pull request? This PR upgrades curator to 5.2.0 to make it consistent with Hadoop 3.3.6. Please see: https://issues.apache.org/jira/browse/HADOOP-18515 ### Why are the changes needed? To fix potential compatibility issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test. Closes #42474 from wangyum/SPARK-44792. Authored-by: Yuming Wang Signed-off-by: Yuming Wang --- dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++--- pom.xml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 3e8131b2734..d0b30a69452 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -51,9 +51,9 @@ commons-math3/3.6.1//commons-math3-3.6.1.jar commons-pool/1.5.4//commons-pool-1.5.4.jar commons-text/1.10.0//commons-text-1.10.0.jar compress-lzf/1.1.2//compress-lzf-1.1.2.jar -curator-client/2.13.0//curator-client-2.13.0.jar -curator-framework/2.13.0//curator-framework-2.13.0.jar -curator-recipes/2.13.0//curator-recipes-2.13.0.jar +curator-client/5.2.0//curator-client-5.2.0.jar +curator-framework/5.2.0//curator-framework-5.2.0.jar +curator-recipes/5.2.0//curator-recipes-5.2.0.jar datanucleus-api-jdo/4.2.4//datanucleus-api-jdo-4.2.4.jar datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar diff --git a/pom.xml b/pom.xml index f25fb6b5f62..524913f49eb 100644 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ 3.11.4 ${hadoop.version} 3.6.3 -2.13.0 +5.2.0 org.apache.hive core - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 03/03: Revert "Add test"
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 481546385a605143d3a6103fa2aaf9936612465d Author: Herman van Hovell AuthorDate: Mon Aug 14 05:39:15 2023 +0200 Revert "Add test" This reverts commit 2c465f681ada8be8cb53edfff3ddbd273b89bc72. --- .../scala/org/apache/spark/sql/application/ReplE2ESuite.scala| 9 - 1 file changed, 9 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 0cab66eef3d..0e69b5afa45 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -276,13 +276,4 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { val output = runCommandsInShell(input) assertContains("Array[MyTestClass] = Array(MyTestClass(1), MyTestClass(3))", output) } - - test("REPL class in UDF") { -val input = """ -|case class MyTestClass(value: Int) -|spark.range(2).map(i => MyTestClass(i.toInt)).collect() - """.stripMargin -val output = runCommandsInShell(input) -assertContains("Array[MyTestClass] = Array(MyTestClass(0), MyTestClass(1))", output) - } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/03: Revert "Add classloader Id to code generation cache."
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 3c1ea33a3bfd5c355fff635499c48905b24ca3e8 Author: Herman van Hovell AuthorDate: Mon Aug 14 05:39:06 2023 +0200 Revert "Add classloader Id to code generation cache." This reverts commit 6d4891b32cee585523a51a5304d6aa3c47bb8af8. --- .../expressions/codegen/CodeGenerator.scala| 40 -- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 59688cae889..8d10f6cd295 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ByteArrayInputStream -import java.util.UUID import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -26,7 +25,6 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import com.google.common.cache.{CacheBuilder, CacheLoader} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} import org.codehaus.janino.ClassBodyEvaluator @@ -1441,7 +1439,7 @@ object CodeGenerator extends Logging { * @return a pair of a generated class and the bytecode statistics of generated functions. */ def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try { -cache.get((classLoaderUUID.get(Utils.getContextOrSparkClassLoader), code)) +cache.get(code) } catch { // Cache.get() may wrap the original exception. See the following URL // https://guava.dev/releases/14.0.1/api/docs/com/google/common/cache/ @@ -1583,30 +1581,20 @@ object CodeGenerator extends Logging { * aborted. See [[NonFateSharingCache]] for more details. */ private val cache = { -val loadFunc: ((ClassLoaderId, CodeAndComment)) => (GeneratedClass, ByteCodeStats) = { - case (_, code) => -val startTime = System.nanoTime() -val result = doCompile(code) -val endTime = System.nanoTime() -val duration = endTime - startTime -val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS -CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) -CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) -logInfo(s"Code generated in $timeMs ms") -_compileTime.add(duration) -result +def loadFunc: CodeAndComment => (GeneratedClass, ByteCodeStats) = code => { + val startTime = System.nanoTime() + val result = doCompile(code) + val endTime = System.nanoTime() + val duration = endTime - startTime + val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS + CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) + CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) + logInfo(s"Code generated in $timeMs ms") + _compileTime.add(duration) + result } -NonFateSharingCache(loadFunc, SQLConf.get.codegenCacheMaxEntries) - } - - type ClassLoaderId = String - private val classLoaderUUID = { -NonFateSharingCache(CacheBuilder.newBuilder() - .weakKeys - .maximumSize(SQLConf.get.codegenCacheMaxEntries) - .build(new CacheLoader[ClassLoader, ClassLoaderId]() { -override def load(code: ClassLoader): ClassLoaderId = UUID.randomUUID.toString - })) +NonFateSharingCache[CodeAndComment, (GeneratedClass, ByteCodeStats)]( + loadFunc, SQLConf.get.codegenCacheMaxEntries) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (890748873bd -> 481546385a6)
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 890748873bd Refine solution new 8a0e0591c83 Revert "Refine solution" new 3c1ea33a3bf Revert "Add classloader Id to code generation cache." new 481546385a6 Revert "Add test" The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../spark/sql/application/ReplE2ESuite.scala | 9 .../spark/sql/catalyst/encoders/OuterScopes.scala | 49 +- .../expressions/codegen/CodeGenerator.scala| 30 +++-- 3 files changed, 34 insertions(+), 54 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/03: Revert "Refine solution"
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 8a0e0591c83f450b75b0c066ba50fc2d3f20b290 Author: Herman van Hovell AuthorDate: Mon Aug 14 05:38:54 2023 +0200 Revert "Refine solution" This reverts commit 890748873bd8bd72b34d3f907ecdb72a694234c9. --- .../spark/sql/catalyst/encoders/OuterScopes.scala | 49 +- .../expressions/codegen/CodeGenerator.scala| 18 ++-- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala index 6c10e8ece80..c2ac504c846 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala @@ -26,9 +26,28 @@ import org.apache.spark.util.SparkClassUtils object OuterScopes { private[this] val queue = new ReferenceQueue[AnyRef] + private class HashableWeakReference(v: AnyRef) extends WeakReference[AnyRef](v, queue) { +private[this] val hash = v.hashCode() +override def hashCode(): Int = hash +override def equals(obj: Any): Boolean = { + obj match { +case other: HashableWeakReference => + // Note that referential equality is used to identify & purge + // references from the map whose' referent went out of scope. + if (this eq other) { +true + } else { +val referent = get() +val otherReferent = other.get() +referent != null && otherReferent != null && Objects.equals(referent, otherReferent) + } +case _ => false + } +} + } private def classLoaderRef(c: Class[_]): HashableWeakReference = { -new HashableWeakReference(c.getClassLoader, queue) +new HashableWeakReference(c.getClassLoader) } private[this] val outerScopes = { @@ -135,31 +154,3 @@ object OuterScopes { // e.g. `ammonite.$sess.cmd8$Helper$Foo` -> `ammonite.$sess.cmd8.instance.Foo` private[this] val AmmoniteREPLClass = """^(ammonite\.\$sess\.cmd(?:\d+)\$).*""".r } - -/** - * A [[WeakReference]] that has a stable hash-key. When the referent is still alive we will use - * the referent for equality, once it is dead it we will fallback to referential equality. This - * way you can still do lookups in a map when the referent is alive, and are capable of removing - * dead entries after GC (using a [[ReferenceQueue]]). - */ -private[catalyst] class HashableWeakReference(v: AnyRef, queue: ReferenceQueue[AnyRef]) - extends WeakReference[AnyRef](v, queue) { - def this(v: AnyRef) = this(v, null) - private[this] val hash = v.hashCode() - override def hashCode(): Int = hash - override def equals(obj: Any): Boolean = { -obj match { - case other: HashableWeakReference => -// Note that referential equality is used to identify & purge -// references from the map whose' referent went out of scope. -if (this eq other) { - true -} else { - val referent = get() - val otherReferent = other.get() - referent != null && otherReferent != null && Objects.equals(referent, otherReferent) -} - case _ => false -} - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index fe61cc81359..59688cae889 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ByteArrayInputStream +import java.util.UUID import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -25,6 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import com.google.common.cache.{CacheBuilder, CacheLoader} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} import org.codehaus.janino.ClassBodyEvaluator @@ -35,7 +37,6 @@ import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.HashableWeakReference import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.types._ @@
[spark] 01/03: Add test
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 2c465f681ada8be8cb53edfff3ddbd273b89bc72 Author: Herman van Hovell AuthorDate: Mon Aug 14 04:59:59 2023 +0200 Add test --- .../scala/org/apache/spark/sql/application/ReplE2ESuite.scala| 9 + 1 file changed, 9 insertions(+) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 0e69b5afa45..0cab66eef3d 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -276,4 +276,13 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { val output = runCommandsInShell(input) assertContains("Array[MyTestClass] = Array(MyTestClass(1), MyTestClass(3))", output) } + + test("REPL class in UDF") { +val input = """ +|case class MyTestClass(value: Int) +|spark.range(2).map(i => MyTestClass(i.toInt)).collect() + """.stripMargin +val output = runCommandsInShell(input) +assertContains("Array[MyTestClass] = Array(MyTestClass(0), MyTestClass(1))", output) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6bd95d0e004 -> 890748873bd)
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6bd95d0e004 [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs new 2c465f681ad Add test new 6d4891b32ce Add classloader Id to code generation cache. new 890748873bd Refine solution The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../spark/sql/application/ReplE2ESuite.scala | 9 .../spark/sql/catalyst/encoders/OuterScopes.scala | 49 +- .../expressions/codegen/CodeGenerator.scala| 30 ++--- 3 files changed, 54 insertions(+), 34 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/03: Add classloader Id to code generation cache.
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 6d4891b32cee585523a51a5304d6aa3c47bb8af8 Author: Herman van Hovell AuthorDate: Sun Aug 13 02:49:19 2023 +0200 Add classloader Id to code generation cache. --- .../expressions/codegen/CodeGenerator.scala| 40 ++ 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 8d10f6cd295..59688cae889 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ByteArrayInputStream +import java.util.UUID import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -25,6 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import com.google.common.cache.{CacheBuilder, CacheLoader} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} import org.codehaus.janino.ClassBodyEvaluator @@ -1439,7 +1441,7 @@ object CodeGenerator extends Logging { * @return a pair of a generated class and the bytecode statistics of generated functions. */ def compile(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = try { -cache.get(code) +cache.get((classLoaderUUID.get(Utils.getContextOrSparkClassLoader), code)) } catch { // Cache.get() may wrap the original exception. See the following URL // https://guava.dev/releases/14.0.1/api/docs/com/google/common/cache/ @@ -1581,20 +1583,30 @@ object CodeGenerator extends Logging { * aborted. See [[NonFateSharingCache]] for more details. */ private val cache = { -def loadFunc: CodeAndComment => (GeneratedClass, ByteCodeStats) = code => { - val startTime = System.nanoTime() - val result = doCompile(code) - val endTime = System.nanoTime() - val duration = endTime - startTime - val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS - CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) - CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) - logInfo(s"Code generated in $timeMs ms") - _compileTime.add(duration) - result +val loadFunc: ((ClassLoaderId, CodeAndComment)) => (GeneratedClass, ByteCodeStats) = { + case (_, code) => +val startTime = System.nanoTime() +val result = doCompile(code) +val endTime = System.nanoTime() +val duration = endTime - startTime +val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS +CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) +CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) +logInfo(s"Code generated in $timeMs ms") +_compileTime.add(duration) +result } -NonFateSharingCache[CodeAndComment, (GeneratedClass, ByteCodeStats)]( - loadFunc, SQLConf.get.codegenCacheMaxEntries) +NonFateSharingCache(loadFunc, SQLConf.get.codegenCacheMaxEntries) + } + + type ClassLoaderId = String + private val classLoaderUUID = { +NonFateSharingCache(CacheBuilder.newBuilder() + .weakKeys + .maximumSize(SQLConf.get.codegenCacheMaxEntries) + .build(new CacheLoader[ClassLoader, ClassLoaderId]() { +override def load(code: ClassLoader): ClassLoaderId = UUID.randomUUID.toString + })) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 03/03: Refine solution
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 890748873bd8bd72b34d3f907ecdb72a694234c9 Author: Herman van Hovell AuthorDate: Mon Aug 14 05:32:57 2023 +0200 Refine solution --- .../spark/sql/catalyst/encoders/OuterScopes.scala | 49 +- .../expressions/codegen/CodeGenerator.scala| 18 ++-- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala index c2ac504c846..6c10e8ece80 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/OuterScopes.scala @@ -26,28 +26,9 @@ import org.apache.spark.util.SparkClassUtils object OuterScopes { private[this] val queue = new ReferenceQueue[AnyRef] - private class HashableWeakReference(v: AnyRef) extends WeakReference[AnyRef](v, queue) { -private[this] val hash = v.hashCode() -override def hashCode(): Int = hash -override def equals(obj: Any): Boolean = { - obj match { -case other: HashableWeakReference => - // Note that referential equality is used to identify & purge - // references from the map whose' referent went out of scope. - if (this eq other) { -true - } else { -val referent = get() -val otherReferent = other.get() -referent != null && otherReferent != null && Objects.equals(referent, otherReferent) - } -case _ => false - } -} - } private def classLoaderRef(c: Class[_]): HashableWeakReference = { -new HashableWeakReference(c.getClassLoader) +new HashableWeakReference(c.getClassLoader, queue) } private[this] val outerScopes = { @@ -154,3 +135,31 @@ object OuterScopes { // e.g. `ammonite.$sess.cmd8$Helper$Foo` -> `ammonite.$sess.cmd8.instance.Foo` private[this] val AmmoniteREPLClass = """^(ammonite\.\$sess\.cmd(?:\d+)\$).*""".r } + +/** + * A [[WeakReference]] that has a stable hash-key. When the referent is still alive we will use + * the referent for equality, once it is dead it we will fallback to referential equality. This + * way you can still do lookups in a map when the referent is alive, and are capable of removing + * dead entries after GC (using a [[ReferenceQueue]]). + */ +private[catalyst] class HashableWeakReference(v: AnyRef, queue: ReferenceQueue[AnyRef]) + extends WeakReference[AnyRef](v, queue) { + def this(v: AnyRef) = this(v, null) + private[this] val hash = v.hashCode() + override def hashCode(): Int = hash + override def equals(obj: Any): Boolean = { +obj match { + case other: HashableWeakReference => +// Note that referential equality is used to identify & purge +// references from the map whose' referent went out of scope. +if (this eq other) { + true +} else { + val referent = get() + val otherReferent = other.get() + referent != null && otherReferent != null && Objects.equals(referent, otherReferent) +} + case _ => false +} + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 59688cae889..fe61cc81359 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen import java.io.ByteArrayInputStream -import java.util.UUID import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -26,7 +25,6 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import com.google.common.cache.{CacheBuilder, CacheLoader} import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException} import org.codehaus.commons.compiler.{CompileException, InternalCompilerException} import org.codehaus.janino.ClassBodyEvaluator @@ -37,6 +35,7 @@ import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.HashableWeakReference import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.types._ @@ -1441,7 +1440,8 @@ object CodeGenerator extends Logging { * @return a pair
[spark] branch branch-3.5 updated: [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 0742644d21b [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs 0742644d21b is described below commit 0742644d21b816c8c94ebaf5c789e2ec4e30b099 Author: Ruifeng Zheng AuthorDate: Mon Aug 14 10:40:15 2023 +0800 [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs ### What changes were proposed in this pull request? Add missing version information in DataFrame APIs ### Why are the changes needed? to improve docs ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? CI Closes #42451 from zhengruifeng/doc_df_api_versions. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng (cherry picked from commit 6bd95d0e004505840aa0749107aa76f3a17958be) Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py | 16 ++-- python/pyspark/sql/dataframe.py | 30 +- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 14d9c2c9d05..7b326538a8e 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -2023,22 +2023,10 @@ class DataFrame: # SparkConnect specific API def offset(self, n: int) -> "DataFrame": -"""Returns a new :class: `DataFrame` by skipping the first `n` rows. - -.. versionadded:: 3.4.0 - -Parameters --- -num : int -Number of records to skip. - -Returns ---- -:class:`DataFrame` -Subset of the records -""" return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n), session=self._session) +offset.__doc__ = PySparkDataFrame.offset.__doc__ + @classmethod def withPlan(cls, plan: plan.LogicalPlan, session: "SparkSession") -> "DataFrame": """ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f6fe17539c6..8be2c224265 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -516,6 +516,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 2.0.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Notes - This API is evolving. @@ -1304,7 +1307,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): def offset(self, num: int) -> "DataFrame": """Returns a new :class: `DataFrame` by skipping the first `n` rows. -.. versionadded:: 3.5.0 +.. versionadded:: 3.4.0 + +.. versionchanged:: 3.5.0 +Supports vanilla PySpark. Parameters -- @@ -3540,6 +3546,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.4.0 +.. versionchanged:: 3.4.0 +Supports Spark Connect. + Parameters -- ids : str, Column, tuple, list, optional @@ -3631,6 +3640,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.3.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Parameters -- observation : :class:`Observation` or str @@ -4066,6 +4078,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.5.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Parameters -- subset : List of column names, optional @@ -5276,6 +5291,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): def toDF(self, *cols: str) -> "DataFrame": """Returns a new :class:`DataFrame` that with new specified column names +.. versionadded:: 1.6.0 + .. versionchanged:: 3.4.0 Supports Spark Connect. @@ -5381,6 +5398,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.1.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Notes - The equality comparison here is simplified by tolerating the cosmetic differences @@ -5426,6 +5446,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.1.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Notes - Unlike the standard hash code, the hash is calculated against the query plan @@ -5549,6 +5572,11 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): """
[spark] branch master updated: [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6bd95d0e004 [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs 6bd95d0e004 is described below commit 6bd95d0e004505840aa0749107aa76f3a17958be Author: Ruifeng Zheng AuthorDate: Mon Aug 14 10:40:15 2023 +0800 [SPARK-44775][PYTHON][DOCS] Add missing version information in DataFrame APIs ### What changes were proposed in this pull request? Add missing version information in DataFrame APIs ### Why are the changes needed? to improve docs ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? CI Closes #42451 from zhengruifeng/doc_df_api_versions. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/dataframe.py | 16 ++-- python/pyspark/sql/dataframe.py | 30 +- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 14d9c2c9d05..7b326538a8e 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -2023,22 +2023,10 @@ class DataFrame: # SparkConnect specific API def offset(self, n: int) -> "DataFrame": -"""Returns a new :class: `DataFrame` by skipping the first `n` rows. - -.. versionadded:: 3.4.0 - -Parameters --- -num : int -Number of records to skip. - -Returns ---- -:class:`DataFrame` -Subset of the records -""" return DataFrame.withPlan(plan.Offset(child=self._plan, offset=n), session=self._session) +offset.__doc__ = PySparkDataFrame.offset.__doc__ + @classmethod def withPlan(cls, plan: plan.LogicalPlan, session: "SparkSession") -> "DataFrame": """ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f6fe17539c6..8be2c224265 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -516,6 +516,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 2.0.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Notes - This API is evolving. @@ -1304,7 +1307,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): def offset(self, num: int) -> "DataFrame": """Returns a new :class: `DataFrame` by skipping the first `n` rows. -.. versionadded:: 3.5.0 +.. versionadded:: 3.4.0 + +.. versionchanged:: 3.5.0 +Supports vanilla PySpark. Parameters -- @@ -3540,6 +3546,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.4.0 +.. versionchanged:: 3.4.0 +Supports Spark Connect. + Parameters -- ids : str, Column, tuple, list, optional @@ -3631,6 +3640,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.3.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Parameters -- observation : :class:`Observation` or str @@ -4066,6 +4078,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.5.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Parameters -- subset : List of column names, optional @@ -5276,6 +5291,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): def toDF(self, *cols: str) -> "DataFrame": """Returns a new :class:`DataFrame` that with new specified column names +.. versionadded:: 1.6.0 + .. versionchanged:: 3.4.0 Supports Spark Connect. @@ -5381,6 +5398,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.1.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Notes - The equality comparison here is simplified by tolerating the cosmetic differences @@ -5426,6 +5446,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): .. versionadded:: 3.1.0 +.. versionchanged:: 3.5.0 +Supports Spark Connect. + Notes - Unlike the standard hash code, the hash is calculated against the query plan @@ -5549,6 +5572,11 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): """ Converts the existing DataFrame into a pandas-on-Spark DataFrame. +.. versionadded:: 3.2.0 + +
[spark] branch branch-3.5 updated: [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new f0bb1391fe4 [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes f0bb1391fe4 is described below commit f0bb1391fe460fee886bce9151a47e89e75de671 Author: Herman van Hovell AuthorDate: Mon Aug 14 02:38:54 2023 +0200 [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes ### What changes were proposed in this pull request? Connects arrow deserialization currently does not work with REPL generated classes. For example the following code would fail: ```scala case class MyTestClass(value: Int) { override def toString: String = value.toString } spark.range(10).map(i => MyTestClass(i.toInt)).collect() ``` The problem is that for instantiation of the `MyTestClass` class we need the instance of the class that it was defined in (its outerscope). In Spark we have a mechanism called `OuterScopes` to register these instances in. The `ArrowDeserializer` was not resolving this outer instance. This PR fixes this. We have a similar issue on the executor/driver side. This will be fixed in a different PR. ### Why are the changes needed? It is a bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I have added tests to `ReplE2Esuite` and to the `ArrowEncoderSuite`. Closes #42473 from hvanhovell/SPARK-44791. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell (cherry picked from commit dcf3d582293c3dbb3820d12fa15b41e8bd5fe6ad) Signed-off-by: Herman van Hovell --- .../org/apache/spark/util/SparkClassUtils.scala| 28 +++ .../connect/client/arrow/ArrowDeserializer.scala | 14 +++- .../spark/sql/application/ReplE2ESuite.scala | 33 - .../connect/client/arrow/ArrowEncoderSuite.scala | 12 ++- .../main/scala/org/apache/spark/util/Utils.scala | 28 --- .../spark/sql/catalyst/encoders/OuterScopes.scala | 85 +- .../apache/spark/sql/errors/ExecutionErrors.scala | 7 ++ .../spark/sql/errors/QueryExecutionErrors.scala| 7 -- 8 files changed, 138 insertions(+), 76 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala index a237869aef3..679d546d04c 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala @@ -50,6 +50,34 @@ trait SparkClassUtils { def classIsLoadable(clazz: String): Boolean = { Try { classForName(clazz, initialize = false) }.isSuccess } + + /** + * Returns true if and only if the underlying class is a member class. + * + * Note: jdk8u throws a "Malformed class name" error if a given class is a deeply-nested + * inner class (See SPARK-34607 for details). This issue has already been fixed in jdk9+, so + * we can remove this helper method safely if we drop the support of jdk8u. + */ + def isMemberClass(cls: Class[_]): Boolean = { +try { + cls.isMemberClass +} catch { + case _: InternalError => +// We emulate jdk8u `Class.isMemberClass` below: +// public boolean isMemberClass() { +// return getSimpleBinaryName() != null && !isLocalOrAnonymousClass(); +// } +// `getSimpleBinaryName()` returns null if a given class is a top-level class, +// so we replace it with `cls.getEnclosingClass != null`. The second condition checks +// if a given class is not a local or an anonymous class, so we replace it with +// `cls.getEnclosingMethod == null` because `cls.getEnclosingMethod()` return a value +// only in either case (JVM Spec 4.8.6). +// +// Note: The newer jdk evaluates `!isLocalOrAnonymousClass()` first, +// we reorder the conditions to follow it. +cls.getEnclosingMethod == null && cls.getEnclosingClass != null +} + } } object SparkClassUtils extends SparkClassUtils diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala index 55dd640f1b6..82086b9d47a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala @@ -34,7 +34,7 @@ import org.apache.arrow.vector.ipc.ArrowReader import org.apache.arrow.vector.util.Text import
[spark] branch master updated: [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dcf3d582293 [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes dcf3d582293 is described below commit dcf3d582293c3dbb3820d12fa15b41e8bd5fe6ad Author: Herman van Hovell AuthorDate: Mon Aug 14 02:38:54 2023 +0200 [SPARK-44791][CONNECT] Make ArrowDeserializer work with REPL generated classes ### What changes were proposed in this pull request? Connects arrow deserialization currently does not work with REPL generated classes. For example the following code would fail: ```scala case class MyTestClass(value: Int) { override def toString: String = value.toString } spark.range(10).map(i => MyTestClass(i.toInt)).collect() ``` The problem is that for instantiation of the `MyTestClass` class we need the instance of the class that it was defined in (its outerscope). In Spark we have a mechanism called `OuterScopes` to register these instances in. The `ArrowDeserializer` was not resolving this outer instance. This PR fixes this. We have a similar issue on the executor/driver side. This will be fixed in a different PR. ### Why are the changes needed? It is a bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I have added tests to `ReplE2Esuite` and to the `ArrowEncoderSuite`. Closes #42473 from hvanhovell/SPARK-44791. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell --- .../org/apache/spark/util/SparkClassUtils.scala| 28 +++ .../connect/client/arrow/ArrowDeserializer.scala | 14 +++- .../spark/sql/application/ReplE2ESuite.scala | 33 - .../connect/client/arrow/ArrowEncoderSuite.scala | 12 ++- .../main/scala/org/apache/spark/util/Utils.scala | 28 --- .../spark/sql/catalyst/encoders/OuterScopes.scala | 85 +- .../apache/spark/sql/errors/ExecutionErrors.scala | 7 ++ .../spark/sql/errors/QueryExecutionErrors.scala| 7 -- 8 files changed, 138 insertions(+), 76 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala index a237869aef3..679d546d04c 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkClassUtils.scala @@ -50,6 +50,34 @@ trait SparkClassUtils { def classIsLoadable(clazz: String): Boolean = { Try { classForName(clazz, initialize = false) }.isSuccess } + + /** + * Returns true if and only if the underlying class is a member class. + * + * Note: jdk8u throws a "Malformed class name" error if a given class is a deeply-nested + * inner class (See SPARK-34607 for details). This issue has already been fixed in jdk9+, so + * we can remove this helper method safely if we drop the support of jdk8u. + */ + def isMemberClass(cls: Class[_]): Boolean = { +try { + cls.isMemberClass +} catch { + case _: InternalError => +// We emulate jdk8u `Class.isMemberClass` below: +// public boolean isMemberClass() { +// return getSimpleBinaryName() != null && !isLocalOrAnonymousClass(); +// } +// `getSimpleBinaryName()` returns null if a given class is a top-level class, +// so we replace it with `cls.getEnclosingClass != null`. The second condition checks +// if a given class is not a local or an anonymous class, so we replace it with +// `cls.getEnclosingMethod == null` because `cls.getEnclosingMethod()` return a value +// only in either case (JVM Spec 4.8.6). +// +// Note: The newer jdk evaluates `!isLocalOrAnonymousClass()` first, +// we reorder the conditions to follow it. +cls.getEnclosingMethod == null && cls.getEnclosingClass != null +} + } } object SparkClassUtils extends SparkClassUtils diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala index 55dd640f1b6..82086b9d47a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/arrow/ArrowDeserializer.scala @@ -34,7 +34,7 @@ import org.apache.arrow.vector.ipc.ArrowReader import org.apache.arrow.vector.util.Text import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder +import
[spark] branch branch-3.5 updated: [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new b0b15475a0a [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client b0b15475a0a is described below commit b0b15475a0ac2d73b829491532747a249498c1a6 Author: Herman van Hovell AuthorDate: Sun Aug 13 20:27:08 2023 +0200 [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client ### What changes were proposed in this pull request? This PR adds Dataset.explode to the Spark Connect Scala Client. ### Why are the changes needed? To increase compatibility with the existing Dataset API in sql/core. ### Does this PR introduce _any_ user-facing change? Yes, it adds a new method to the scala client. ### How was this patch tested? I added a test to `UserDefinedFunctionE2ETestSuite`. Closes #42418 from hvanhovell/SPARK-44736. Lead-authored-by: Herman van Hovell Co-authored-by: itholic Co-authored-by: Juliusz Sompolski Co-authored-by: Martin Grund Co-authored-by: Hyukjin Kwon Co-authored-by: Kent Yao Co-authored-by: Wenchen Fan Co-authored-by: Wei Liu Co-authored-by: Ruifeng Zheng Co-authored-by: Gengliang Wang Co-authored-by: Yuming Wang Co-authored-by: Herman van Hovell Co-authored-by: 余良 Co-authored-by: Dongjoon Hyun Co-authored-by: Jack Chen Co-authored-by: srielau Co-authored-by: zhyhimont Co-authored-by: Daniel Tenedorio Co-authored-by: Dongjoon Hyun Co-authored-by: Zhyhimont Dmitry Co-authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Co-authored-by: yangjie01 Co-authored-by: Yihong He Co-authored-by: Rameshkrishnan Muthusamy Co-authored-by: Jia Fan Co-authored-by: allisonwang-db Co-authored-by: Utkarsh Co-authored-by: Cheng Pan Co-authored-by: Jason Li Co-authored-by: Shu Wang Co-authored-by: Nicolas Fraison Co-authored-by: Max Gekk Co-authored-by: panbingkun Co-authored-by: Ziqi Liu Signed-off-by: Herman van Hovell (cherry picked from commit f496cd1ee2a7e59af08e1bd3ab0579f93cc46da9) Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 70 ++ .../sql/UserDefinedFunctionE2ETestSuite.scala | 60 +++ .../CheckConnectJvmClientCompatibility.scala | 1 - .../apache/spark/sql/connect/common/UdfUtils.scala | 4 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 3 +- 5 files changed, 136 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 2d72ea6bda8..28b04fb850e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -21,12 +21,14 @@ import java.util.{Collections, Locale} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.function._ import org.apache.spark.connect.proto +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ import org.apache.spark.sql.catalyst.expressions.OrderUtils @@ -2728,6 +2730,74 @@ class Dataset[T] private[sql] ( flatMap(UdfUtils.flatMapFuncToScalaFunc(f))(encoder) } + /** + * (Scala-specific) Returns a new Dataset where each row has been expanded to zero or more rows + * by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of the + * input row are implicitly joined with each row that is output by the function. + * + * Given that this is deprecated, as an alternative, you can explode columns either using + * `functions.explode()` or `flatMap()`. The following example uses these alternatives to count + * the number of books that contain a given word: + * + * {{{ + * case class Book(title: String, words: String) + * val ds: Dataset[Book] + * + * val allWords = ds.select($"title", explode(split($"words", " ")).as("word")) + * + * val bookCountPerWord = allWords.groupBy("word").agg(count_distinct("title")) + * }}} + * + * Using `flatMap()` this can similarly be exploded as: + * + * {{{ + * ds.flatMap(_.words.split(" ")) + * }}} + * + * @group untypedrel + * @since 3.5.0
[spark] branch master updated: [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f496cd1ee2a [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client f496cd1ee2a is described below commit f496cd1ee2a7e59af08e1bd3ab0579f93cc46da9 Author: Herman van Hovell AuthorDate: Sun Aug 13 20:27:08 2023 +0200 [SPARK-44736][CONNECT] Add Dataset.explode to Spark Connect Scala Client ### What changes were proposed in this pull request? This PR adds Dataset.explode to the Spark Connect Scala Client. ### Why are the changes needed? To increase compatibility with the existing Dataset API in sql/core. ### Does this PR introduce _any_ user-facing change? Yes, it adds a new method to the scala client. ### How was this patch tested? I added a test to `UserDefinedFunctionE2ETestSuite`. Closes #42418 from hvanhovell/SPARK-44736. Lead-authored-by: Herman van Hovell Co-authored-by: itholic Co-authored-by: Juliusz Sompolski Co-authored-by: Martin Grund Co-authored-by: Hyukjin Kwon Co-authored-by: Kent Yao Co-authored-by: Wenchen Fan Co-authored-by: Wei Liu Co-authored-by: Ruifeng Zheng Co-authored-by: Gengliang Wang Co-authored-by: Yuming Wang Co-authored-by: Herman van Hovell Co-authored-by: 余良 Co-authored-by: Dongjoon Hyun Co-authored-by: Jack Chen Co-authored-by: srielau Co-authored-by: zhyhimont Co-authored-by: Daniel Tenedorio Co-authored-by: Dongjoon Hyun Co-authored-by: Zhyhimont Dmitry Co-authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Co-authored-by: yangjie01 Co-authored-by: Yihong He Co-authored-by: Rameshkrishnan Muthusamy Co-authored-by: Jia Fan Co-authored-by: allisonwang-db Co-authored-by: Utkarsh Co-authored-by: Cheng Pan Co-authored-by: Jason Li Co-authored-by: Shu Wang Co-authored-by: Nicolas Fraison Co-authored-by: Max Gekk Co-authored-by: panbingkun Co-authored-by: Ziqi Liu Signed-off-by: Herman van Hovell --- .../main/scala/org/apache/spark/sql/Dataset.scala | 70 ++ .../sql/UserDefinedFunctionE2ETestSuite.scala | 60 +++ .../CheckConnectJvmClientCompatibility.scala | 1 - .../apache/spark/sql/connect/common/UdfUtils.scala | 4 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 3 +- 5 files changed, 136 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 2d72ea6bda8..28b04fb850e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -21,12 +21,14 @@ import java.util.{Collections, Locale} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.function._ import org.apache.spark.connect.proto +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders._ import org.apache.spark.sql.catalyst.expressions.OrderUtils @@ -2728,6 +2730,74 @@ class Dataset[T] private[sql] ( flatMap(UdfUtils.flatMapFuncToScalaFunc(f))(encoder) } + /** + * (Scala-specific) Returns a new Dataset where each row has been expanded to zero or more rows + * by the provided function. This is similar to a `LATERAL VIEW` in HiveQL. The columns of the + * input row are implicitly joined with each row that is output by the function. + * + * Given that this is deprecated, as an alternative, you can explode columns either using + * `functions.explode()` or `flatMap()`. The following example uses these alternatives to count + * the number of books that contain a given word: + * + * {{{ + * case class Book(title: String, words: String) + * val ds: Dataset[Book] + * + * val allWords = ds.select($"title", explode(split($"words", " ")).as("word")) + * + * val bookCountPerWord = allWords.groupBy("word").agg(count_distinct("title")) + * }}} + * + * Using `flatMap()` this can similarly be exploded as: + * + * {{{ + * ds.flatMap(_.words.split(" ")) + * }}} + * + * @group untypedrel + * @since 3.5.0 + */ + @deprecated("use flatMap() or select() with functions.explode() instead", "3.5.0") + def explode[A <: