[spark] branch branch-3.4 updated: [SPARK-43522][SQL] Fix creating struct column name with index of array
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new a18d71a1bc5 [SPARK-43522][SQL] Fix creating struct column name with index of array a18d71a1bc5 is described below commit a18d71a1bc5891bda91ad2c196768dfe86e49f30 Author: Jia Fan AuthorDate: Thu May 18 14:29:30 2023 +0800 [SPARK-43522][SQL] Fix creating struct column name with index of array ### What changes were proposed in this pull request? When creating a struct column in Dataframe, the code that ran without problems in version 3.3.1 does not work in version 3.4.0. In 3.3.1 ```scala val testDF = Seq("a=b,c=d,d=f").toDF.withColumn("key_value", split('value, ",")).withColumn("map_entry", transform(col("key_value"), x => struct(split(x, "=").getItem(0), split(x, "=").getItem(1) ) )) testDF.show() +---+---++ | value| key_value| map_entry| +---+---++ |a=b,c=d,d=f|[a=b, c=d, d=f]|[{a, b}, {c, d}, ...| +---+---++ ``` In 3.4.0 ``` org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.CREATE_NAMED_STRUCT_WITHOUT_FOLDABLE_STRING] Cannot resolve "struct(split(namedlambdavariable(), =, -1)[0], split(namedlambdavariable(), =, -1)[1])" due to data type mismatch: Only foldable `STRING` expressions are allowed to appear at odd position, but they are ["0", "1"].; 'Project [value#41, key_value#45, transform(key_value#45, lambdafunction(struct(0, split(lambda x_3#49, =, -1)[0], 1, split(lambda x_3#49, =, -1)[1]), lambda x_3#49, false)) AS map_entry#48] +- Project [value#41, split(value#41, ,, -1) AS key_value#45] +- LocalRelation [value#41] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:269) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) ``` The reason is `CreateNamedStruct` will use last expr of value `Expression` as column name. And will check it must are `String`. But array `Expression`'s last expr are `Integer`. The check will failed. So we can skip match with `UnresolvedExtractValue` when last expr not `String`. Then it will when fall back to the default name. ### Why are the changes needed? Fix the bug when creating struct column name with index of array ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new test Closes #41187 from Hisoka-X/SPARK-43522_struct_name_array. Authored-by: Jia Fan Signed-off-by: Wenchen Fan (cherry picked from commit f2a29176de6e0b1628de6ca962cbf5036b145e0a) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/complexTypeCreator.scala | 2 +- sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala index ea871d453b3..20512191312 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala @@ -371,7 +
[spark] branch master updated (5e599953889 -> f2a29176de6)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5e599953889 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects add f2a29176de6 [SPARK-43522][SQL] Fix creating struct column name with index of array No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/expressions/complexTypeCreator.scala | 2 +- sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 2 files changed, 9 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 5cc4c5d2912 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects 5cc4c5d2912 is described below commit 5cc4c5d2912a999e247f6c6f92c8ba858946594a Author: Rob Reeves AuthorDate: Thu May 18 14:24:05 2023 +0800 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects ### What changes were proposed in this pull request? This is the most narrow fix for the issue observed in SPARK-43157. It does not attempt to identify or solve all potential correctness and concurrency issues from TreeNode.tags being modified in multiple places. It solves the issue described in SPARK-43157 by cloning the cached plan when populating `InMemoryRelation.innerChildren`. I chose to do the clone at this point to limit the scope to tree traversal used for building up the string representation of the plan, which is where we se [...] Another solution I tried was to modify `InMemoryRelation.clone` to create a new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go with this approach because `CachedRDDBuilder` has mutable state that needs to be moved to the new object and I didn't want to add that complexity if not needed. ### Why are the changes needed? When caching is used the cached part of the SparkPlan is leaked to new clones of the plan. This leakage is an issue because if the TreeNode.tags are modified in one plan, it impacts the other plan. This is a correctness issue and a concurrency issue if the TreeNode.tags are set in different threads for the cloned plans. See the description of [SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example of the concurrency issue. ### Does this PR introduce _any_ user-facing change? Yes. It fixes a driver hanging issue the user can observe. ### How was this patch tested? Unit test added and I manually verified `Dataset.explain("formatted")` still had the expected output. ```scala spark.range(10).cache.filter($"id" > 5).explain("formatted") == Physical Plan == * Filter (4) +- InMemoryTableScan (1) +- InMemoryRelation (2) +- * Range (3) (1) InMemoryTableScan Output [1]: [id#0L] Arguments: [id#0L], [(id#0L > 5)] (2) InMemoryRelation Arguments: [id#0L], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16) ,None), [id#0L ASC NULLS FIRST] (3) Range [codegen id : 1] Output [1]: [id#0L] Arguments: Range (0, 10, step=1, splits=Some(16)) (4) Filter [codegen id : 1] Input [1]: [id#0L] Condition : (id#0L > 5) ``` I also verified that the `InMemory.innerChildren` is cloned when the entire plan is cloned. ```scala import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import spark.implicits._ def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = { if (plan.isInstanceOf[InMemoryTableScanExec]) { Some(plan.asInstanceOf[InMemoryTableScanExec]) } else if (plan.children.isEmpty && plan.subqueries.isEmpty) { None } else { (plan.subqueries.flatMap(p => findCacheOperator(p)) ++ plan.children.flatMap(findCacheOperator)).headOption } } val df = spark.range(10).filter($"id" < 100).cache() val df1 = df.limit(1) val df2 = df.limit(1) // Get the cache operator (InMemoryTableScanExec) in each plan val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get // Check if InMemoryTableScanExec references point to the same object println(plan1.eq(plan2)) // returns false// Check if InMemoryRelation references point to the same object println(plan1.relation.eq(plan2.relation)) // returns false // Check if the cached SparkPlan references point to the same object println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head)) // returns false // This shows the issue is fixed ``` Closes #40812 from robreeves/roreeves/explain_util. Authored-by: Rob Reeves Signed-off-by: Wenchen Fan (cherry picked from commit 5e5999538899732bf3cdd04b974f1abeb949ccd0) Signed-off-by: Wenchen Fan --- .../sql/execution/columnar/InMemoryRelation.scala | 12 ++- .../execution/columnar/InMemoryRelationSuite.scala | 37
[spark] branch master updated: [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5e599953889 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects 5e599953889 is described below commit 5e5999538899732bf3cdd04b974f1abeb949ccd0 Author: Rob Reeves AuthorDate: Thu May 18 14:24:05 2023 +0800 [SPARK-43157][SQL] Clone InMemoryRelation cached plan to prevent cloned plan from referencing same objects ### What changes were proposed in this pull request? This is the most narrow fix for the issue observed in SPARK-43157. It does not attempt to identify or solve all potential correctness and concurrency issues from TreeNode.tags being modified in multiple places. It solves the issue described in SPARK-43157 by cloning the cached plan when populating `InMemoryRelation.innerChildren`. I chose to do the clone at this point to limit the scope to tree traversal used for building up the string representation of the plan, which is where we se [...] Another solution I tried was to modify `InMemoryRelation.clone` to create a new `CachedRDDBuilder` and pass in a cloned `cachedPlan`. I opted not to go with this approach because `CachedRDDBuilder` has mutable state that needs to be moved to the new object and I didn't want to add that complexity if not needed. ### Why are the changes needed? When caching is used the cached part of the SparkPlan is leaked to new clones of the plan. This leakage is an issue because if the TreeNode.tags are modified in one plan, it impacts the other plan. This is a correctness issue and a concurrency issue if the TreeNode.tags are set in different threads for the cloned plans. See the description of [SPARK-43157](https://issues.apache.org/jira/browse/SPARK-43157) for an example of the concurrency issue. ### Does this PR introduce _any_ user-facing change? Yes. It fixes a driver hanging issue the user can observe. ### How was this patch tested? Unit test added and I manually verified `Dataset.explain("formatted")` still had the expected output. ```scala spark.range(10).cache.filter($"id" > 5).explain("formatted") == Physical Plan == * Filter (4) +- InMemoryTableScan (1) +- InMemoryRelation (2) +- * Range (3) (1) InMemoryTableScan Output [1]: [id#0L] Arguments: [id#0L], [(id#0L > 5)] (2) InMemoryRelation Arguments: [id#0L], CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer418b946b,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Range (0, 10, step=1, splits=16) ,None), [id#0L ASC NULLS FIRST] (3) Range [codegen id : 1] Output [1]: [id#0L] Arguments: Range (0, 10, step=1, splits=Some(16)) (4) Filter [codegen id : 1] Input [1]: [id#0L] Condition : (id#0L > 5) ``` I also verified that the `InMemory.innerChildren` is cloned when the entire plan is cloned. ```scala import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import spark.implicits._ def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = { if (plan.isInstanceOf[InMemoryTableScanExec]) { Some(plan.asInstanceOf[InMemoryTableScanExec]) } else if (plan.children.isEmpty && plan.subqueries.isEmpty) { None } else { (plan.subqueries.flatMap(p => findCacheOperator(p)) ++ plan.children.flatMap(findCacheOperator)).headOption } } val df = spark.range(10).filter($"id" < 100).cache() val df1 = df.limit(1) val df2 = df.limit(1) // Get the cache operator (InMemoryTableScanExec) in each plan val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get // Check if InMemoryTableScanExec references point to the same object println(plan1.eq(plan2)) // returns false// Check if InMemoryRelation references point to the same object println(plan1.relation.eq(plan2.relation)) // returns false // Check if the cached SparkPlan references point to the same object println(plan1.relation.innerChildren.head.eq(plan2.relation.innerChildren.head)) // returns false // This shows the issue is fixed ``` Closes #40812 from robreeves/roreeves/explain_util. Authored-by: Rob Reeves Signed-off-by: Wenchen Fan --- .../sql/execution/columnar/InMemoryRelation.scala | 12 ++- .../execution/columnar/InMemoryRelationSuite.scala | 37 ++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apach
[spark] branch master updated: [SPARK-43413][SQL][FOLLOWUP] Show a directional message in ListQuery nullability assertion
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 12f956cffd1 [SPARK-43413][SQL][FOLLOWUP] Show a directional message in ListQuery nullability assertion 12f956cffd1 is described below commit 12f956cffd1dca81a145122310134ec8e57cb5aa Author: Jack Chen AuthorDate: Wed May 17 23:02:34 2023 -0700 [SPARK-43413][SQL][FOLLOWUP] Show a directional message in ListQuery nullability assertion ### What changes were proposed in this pull request? In case the assert for the call to ListQuery.nullable is hit, mention in the assert error message the conf flag that can be used to disable the assert. Follow-up to https://github.com/apache/spark/pull/41094#discussion_r1195438179 ### Why are the changes needed? Improve error message. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests Closes #41202 from jchen5/in-nullability-assert. Authored-by: Jack Chen Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/catalyst/expressions/subquery.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index b0f10895c17..cd31b94905a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -372,7 +372,8 @@ case class ListQuery( // ListQuery can't be executed alone so its nullability is not defined. // Consider using ListQuery.childOutputs.exists(_.nullable) if (!SQLConf.get.getConf(SQLConf.LEGACY_IN_SUBQUERY_NULLABILITY)) { - assert(false, "ListQuery nullability is not defined") + assert(false, "ListQuery nullability is not defined. To restore the legacy behavior before " + +s"Spark 3.5.0, set ${SQLConf.LEGACY_IN_SUBQUERY_NULLABILITY.key}=true") } false } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43548][SS] Remove workaround for HADOOP-16255
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a983591bbbf [SPARK-43548][SS] Remove workaround for HADOOP-16255 a983591bbbf is described below commit a983591bbbf3ecd7b99b26d6b55590836121857b Author: panbingkun AuthorDate: Wed May 17 22:41:53 2023 -0700 [SPARK-43548][SS] Remove workaround for HADOOP-16255 ### What changes were proposed in this pull request? The pr aims to remove workaround for HADOOP-16255. https://issues.apache.org/jira/browse/HADOOP-16255 ### Why are the changes needed? - Because HADOOP-16255 has been fix after hadoop version 3.1.2. Spark support hadoop version: >= 3.2.2 or >= 3.3.1 - Make code clean. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #41209 from panbingkun/SPARK-43548. Authored-by: panbingkun Signed-off-by: Dongjoon Hyun --- .../sql/execution/streaming/CheckpointFileManager.scala| 14 -- 1 file changed, 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 6df0a2f3063..ad3212871fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -370,20 +370,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) -// TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved -mayRemoveCrcFile(srcPath) - } - - private def mayRemoveCrcFile(path: Path): Unit = { -try { - val checksumFile = new Path(path.getParent, s".${path.getName}.crc") - if (exists(checksumFile)) { -// checksum file exists, deleting it -delete(checksumFile) - } -} catch { - case NonFatal(_) => // ignore, we are removing crc file as "best-effort" -} } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43509][PYTHON][CONNECT][FOLLOW-UP] Set SPARK_CONNECT_MODE_ENABLED when running pyspark shell with remote is local
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 32bdf8a5b52 [SPARK-43509][PYTHON][CONNECT][FOLLOW-UP] Set SPARK_CONNECT_MODE_ENABLED when running pyspark shell with remote is local 32bdf8a5b52 is described below commit 32bdf8a5b52ab1370dd009cd5967042c7ac20446 Author: Hyukjin Kwon AuthorDate: Thu May 18 14:35:39 2023 +0900 [SPARK-43509][PYTHON][CONNECT][FOLLOW-UP] Set SPARK_CONNECT_MODE_ENABLED when running pyspark shell with remote is local ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/41013 that sets `SPARK_CONNECT_MODE_ENABLED` when running PySpark shell via `bin/pyspark --remote=local` so it works. ### Why are the changes needed? It is broken after the PR: ```bash ./bin/pyspark --remote local ``` ```python ... Traceback (most recent call last): File "/.../python/pyspark/shell.py", line 78, in sc = spark.sparkContext File "/.../python/pyspark/sql/connect/session.py", line 567, in sparkContext raise PySparkNotImplementedError( pyspark.errors.exceptions.base.PySparkNotImplementedError: [NOT_IMPLEMENTED] sparkContext() is not implemented. ``` ### Does this PR introduce _any_ user-facing change? No to end users. Yes to the dev as described above. ### How was this patch tested? Manually tested via `./bin/pyspark --remote local` Closes #41206 from HyukjinKwon/SPARK-43509-followup. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/launcher/SparkSubmitCommandBuilder.java | 1 + python/pyspark/shell.py | 9 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 289eb31db31..9618673b4ce 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -363,6 +363,7 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { } if (remoteStr != null) { env.put("SPARK_REMOTE", remoteStr); + env.put("SPARK_CONNECT_MODE_ENABLED", "1"); } if (!isEmpty(pyOpts)) { diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 86f576a3029..7e2093c1d31 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -100,10 +100,11 @@ print( % (platform.python_version(), platform.python_build()[0], platform.python_build()[1]) ) if is_remote(): -print( -"Client connected to the Spark Connect server at %s" -% urlparse(os.environ["SPARK_REMOTE"]).netloc -) +url = os.environ.get("SPARK_REMOTE", None) +assert url is not None +if url.startswith("local"): +url = "sc://localhost" # only for display in the console. +print("Client connected to the Spark Connect server at %s" % urlparse(url).netloc) else: print("Spark context Web UI available at %s" % (sc.uiWebUrl)) # type: ignore[union-attr] print( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated (f89e52037bf -> b0f5a7abda7)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git from f89e52037bf [SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration notes for pandas API on Spark add b0f5a7abda7 [SPARK-43547][3.4][PS][DOCS] Update "Supported Pandas API" page to point out the proper pandas docs No new revisions were added by this update. Summary of changes: python/pyspark/pandas/supported_api_gen.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration notes for pandas API on Spark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new f89e52037bf [SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration notes for pandas API on Spark f89e52037bf is described below commit f89e52037bf6fcd560fe4698de427b57ae1c36eb Author: itholic AuthorDate: Thu May 18 12:02:26 2023 +0900 [SPARK-42826][3.4][FOLLOWUP][PS][DOCS] Update migration notes for pandas API on Spark ### What changes were proposed in this pull request? This is follow-up for https://github.com/apache/spark/pull/40459 to fix the incorrect information and to elaborate more detailed changes. - We're not fully support the pandas 2.0.0, so the information "Pandas API on Spark follows for the pandas 2.0" is not correct. - We should list all the APIs that no longer support `inplace` parameter. ### Why are the changes needed? Correctness for migration notes. ### Does this PR introduce _any_ user-facing change? No, only updating migration notes. ### How was this patch tested? The existing CI should pass Closes #41207 from itholic/migration_guide_followup. Authored-by: itholic Signed-off-by: Hyukjin Kwon --- python/docs/source/migration_guide/pyspark_upgrade.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index 7513d64ef6c..f570f8e9dfb 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -33,8 +33,8 @@ Upgrading from PySpark 3.3 to 3.4 * In Spark 3.4, the ``Series.concat`` sort parameter will be respected to follow pandas 1.4 behaviors. * In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace pre-existing arrays, which will NOT be over-written to follow pandas 1.4 behaviors. * In Spark 3.4, the ``SparkSession.sql`` and the Pandas on Spark API ``sql`` have got new parameter ``args`` which provides binding of named parameters to their SQL literals. -* In Spark 3.4, Pandas API on Spark follows for the pandas 2.0, and some APIs were deprecated or removed in Spark 3.4 according to the changes made in pandas 2.0. Please refer to the [release notes of pandas](https://pandas.pydata.org/docs/dev/whatsnew/) for more details. * In Spark 3.4, the custom monkey-patch of ``collections.namedtuple`` was removed, and ``cloudpickle`` was used by default. To restore the previous behavior for any relevant pickling issue of ``collections.namedtuple``, set ``PYSPARK_ENABLE_NAMEDTUPLE_PATCH`` environment variable to ``1``. +* In Spark 3.4, the ``inplace`` parameter is no longer supported for Pandas API on Spark API ``add_categories``, ``remove_categories``, ``remove_unused_categories``, ``rename_categories``, ``reorder_categories``, ``set_categories`` to follow pandas 2.0.0 behaviors. Upgrading from PySpark 3.2 to 3.3 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43022][CONNECT] Support protobuf functions for Scala client
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2dd112ec137 [SPARK-43022][CONNECT] Support protobuf functions for Scala client 2dd112ec137 is described below commit 2dd112ec1372346ff573e5c273cc5cb93bda033a Author: yangjie01 AuthorDate: Thu May 18 10:06:05 2023 +0800 [SPARK-43022][CONNECT] Support protobuf functions for Scala client ### What changes were proposed in this pull request? This pr aims to support protobuf functions for Scala client. ### Why are the changes needed? Add Spark connect jvm client api coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Add new test - Checked Scala 2.13 Closes #40654 from LuciferYang/protobuf-functions. Lead-authored-by: yangjie01 Co-authored-by: YangJie Signed-off-by: yangjie01 --- .../org/apache/spark/sql/protobuf/functions.scala | 202 + .../org/apache/spark/sql/FunctionTestSuite.scala | 11 ++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 63 +++ .../src/test/resources/protobuf-tests/common.desc | 12 ++ .../from_protobuf_messageClassName.explain | 2 + ..._protobuf_messageClassName_descFilePath.explain | 2 + ...f_messageClassName_descFilePath_options.explain | 2 + .../from_protobuf_messageClassName_options.explain | 2 + .../to_protobuf_messageClassName.explain | 2 + ..._protobuf_messageClassName_descFilePath.explain | 2 + ...f_messageClassName_descFilePath_options.explain | 2 + .../to_protobuf_messageClassName_options.explain | 2 + .../queries/from_protobuf_messageClassName.json| 29 +++ .../from_protobuf_messageClassName.proto.bin | Bin 0 -> 125 bytes ...rom_protobuf_messageClassName_descFilePath.json | 33 ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 0 -> 156 bytes ...obuf_messageClassName_descFilePath_options.json | 46 + ...messageClassName_descFilePath_options.proto.bin | Bin 0 -> 206 bytes .../from_protobuf_messageClassName_options.json| 42 + ...rom_protobuf_messageClassName_options.proto.bin | Bin 0 -> 174 bytes .../queries/to_protobuf_messageClassName.json | 29 +++ .../queries/to_protobuf_messageClassName.proto.bin | Bin 0 -> 123 bytes .../to_protobuf_messageClassName_descFilePath.json | 33 ...rotobuf_messageClassName_descFilePath.proto.bin | Bin 0 -> 154 bytes ...obuf_messageClassName_descFilePath_options.json | 46 + ...messageClassName_descFilePath_options.proto.bin | Bin 0 -> 204 bytes .../to_protobuf_messageClassName_options.json | 42 + .../to_protobuf_messageClassName_options.proto.bin | Bin 0 -> 172 bytes connector/connect/server/pom.xml | 6 + .../sql/connect/planner/SparkConnectPlanner.scala | 59 ++ .../sql/protobuf/CatalystDataToProtobuf.scala | 2 +- .../sql/protobuf/ProtobufDataToCatalyst.scala | 2 +- 32 files changed, 671 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala new file mode 100644 index 000..c42f8417155 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/protobuf/functions.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.protobuf + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.{fnWithOptions, lit} + +// scalastyle:off: object.name +object functions { + // scalastyle:on: object.name + + /** + * Converts a binary column of Protobuf format into its corresponding catalyst value. The + * Protobuf definition is provided through Protobuf descriptor file. + * + * @param data + * the binary column. + * @param messageName + * the pro
[spark] branch master updated: [SPARK-43436][BUILD] Upgrade rocksdbjni to 8.1.1.1
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e8c347b5499 [SPARK-43436][BUILD] Upgrade rocksdbjni to 8.1.1.1 e8c347b5499 is described below commit e8c347b549901bec5c0a1f5d5d3e5430adbe69b5 Author: YangJie AuthorDate: Wed May 17 13:30:08 2023 -0700 [SPARK-43436][BUILD] Upgrade rocksdbjni to 8.1.1.1 ### What changes were proposed in this pull request? This pr aims to upgrade rocksdbjni from 8.0.0 to 8.1.1.1. ### Why are the changes needed? The full release notes as follows: - https://github.com/facebook/rocksdb/releases/tag/v8.1.1 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test `RocksDBBenchmark`: **8.0.0** ``` [INFO] Running org.apache.spark.util.kvstore.RocksDBBenchmark count meanmin max 95th dbClose 4 0.339 0.256 0.532 0.532 dbCreation 4 69.637 4.128 268.371 268.371 naturalIndexCreateIterator 10240.005 0.002 1.581 0.006 naturalIndexDescendingCreateIterator10240.005 0.005 0.067 0.007 naturalIndexDescendingIteration 10240.006 0.004 0.029 0.008 naturalIndexIteration 10240.006 0.004 0.069 0.009 randomDeleteIndexed 10240.026 0.018 0.200 0.035 randomDeletesNoIndex10240.015 0.012 0.050 0.018 randomUpdatesIndexed10240.088 0.033 33.234 0.096 randomUpdatesNoIndex10240.035 0.032 0.497 0.039 randomWritesIndexed 10240.129 0.035 56.362 0.121 randomWritesNoIndex 10240.041 0.034 1.415 0.047 refIndexCreateIterator 10240.005 0.004 0.029 0.006 refIndexDescendingCreateIterator10240.003 0.003 0.028 0.004 refIndexDescendingIteration 10240.007 0.005 0.044 0.008 refIndexIteration 10240.008 0.005 0.367 0.010 sequentialDeleteIndexed 10240.023 0.017 1.716 0.025 sequentialDeleteNoIndex 10240.015 0.012 0.043 0.017 sequentialUpdatesIndexed10240.045 0.037 0.943 0.054 sequentialUpdatesNoIndex10240.041 0.029 0.838 0.053 sequentialWritesIndexed 10240.051 0.043 2.945 0.056 sequentialWritesNoIndex 10240.037 0.030 2.828 0.042 ``` **8.1.1.1** ``` [INFO] Running org.apache.spark.util.kvstore.RocksDBBenchmark count meanmin max 95th dbClose 4 0.319 0.233 0.514 0.514 dbCreation 4 69.826 3.767 268.894 268.894 naturalIndexCreateIterator 10240.005 0.002 1.409 0.007 naturalIndexDescendingCreateIterator10240.005 0.005 0.062 0.007 naturalIndexDescendingIteration 10240.006 0.004 0.026 0.008 naturalIndexIteration 10240.007 0.004 0.182 0.011 randomDeleteIndexed 10240.027 0.020 0.259 0.037 randomDeletesNoIndex10240.016 0.013 0.041 0.019 randomUpdatesIndexed10240.087 0.033 29.514 0.102 randomUpdatesNoIndex10240.036 0.033 0.495 0.040 randomWritesIndexed 10240.120 0.033 53.330 0.126 randomWritesNoIndex 10240.041 0.035 1.403 0.047 refIndexCreateIterator 10240.006 0.005 0.019 0.007 refIndexDescendingCreateIterator10240.003 0.003 0.028 0.005 refIndexDescendingIteration 10240.007 0.005 0.053 0.008 refIndexIteration 10240.008 0.006 0.254 0.010 sequentialDeleteIndexed 10240.023 0.018 1.237 0.027 sequentialDeleteNoIndex 10240.015 0.013 0.043 0.018 seque
[spark] branch master updated: [SPARK-43537][INFA][BUILD] Upgrading the ASM dependencies used in the `tools` module to 9.4
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9785353684b [SPARK-43537][INFA][BUILD] Upgrading the ASM dependencies used in the `tools` module to 9.4 9785353684b is described below commit 9785353684bdc2a2c7445b7e6b9ab85154f6933f Author: yangjie01 AuthorDate: Wed May 17 11:18:14 2023 -0500 [SPARK-43537][INFA][BUILD] Upgrading the ASM dependencies used in the `tools` module to 9.4 ### What changes were proposed in this pull request? This pr aims upgrade ASM related dependencies in the `tools` module from version 7.1 to version 9.4 to make `GenerateMIMAIgnore` can process Java 17+ compiled code. Additionally, this pr defines `asm.version` to manage versions of ASM. ### Why are the changes needed? The classpath processed by `GenerateMIMAIgnore` cannot contain Java 17+ compiled code now due to the ASM version use by `tools` module is too low, but https://github.com/bmc/classutil has not been updated for a long time, we can't solve the problem by upgrading `classutil`, so this pr make the `tools` module explicitly rely on ASM 9.4 for workaround. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Action - Manual checked `dev/mima` due to this pr upgrade the dependency of tools module ``` dev/mima ``` and ``` dev/change-scala-version.sh 2.13 dev/mima -Pscala-2.13 ``` - A case that can reproduce the problem: run following script with master branch: ``` set -o pipefail set -e FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" export SPARK_HOME=$FWDIR echo $SPARK_HOME if [[ -x "$JAVA_HOME/bin/java" ]]; then JAVA_CMD="$JAVA_HOME/bin/java" else JAVA_CMD=java fi TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | grep jar | tail -n1)" ASSEMBLY_CLASSPATH="$(build/sbt -DcopyDependencies=false "export assembly/fullClasspath" | grep jar | tail -n1)" rm -f .generated-mima* $JAVA_CMD \ -Xmx2g \ -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.util.jar=ALL-UNNAMED \ -cp "$TOOLS_CLASSPATH:$ASSEMBLY_CLASSPATH" \ org.apache.spark.tools.GenerateMIMAIgnore rm -f .generated-mima* ``` **Before** ``` Exception in thread "main" java.lang.IllegalArgumentException: Unsupported class file major version 61 at org.objectweb.asm.ClassReader.(ClassReader.java:195) at org.objectweb.asm.ClassReader.(ClassReader.java:176) at org.objectweb.asm.ClassReader.(ClassReader.java:162) at org.objectweb.asm.ClassReader.(ClassReader.java:283) at org.clapper.classutil.asm.ClassFile$.load(ClassFinderImpl.scala:222) at org.clapper.classutil.ClassFinder.classData(ClassFinder.scala:404) at org.clapper.classutil.ClassFinder.$anonfun$processOpenZip$2(ClassFinder.scala:359) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator.toStream(Iterator.scala:1417) at scala.collection.Iterator.toStream$(Iterator.scala:1416) at scala.collection.AbstractIterator.toStream(Iterator.scala:1431) at scala.collection.Iterator.$anonfun$toStream$1(Iterator.scala:1417) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163) at scala.collection.immutable.Stream.$anonfun$$plus$plus$1(Stream.scala:372) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163) at scala.collection.immutable.Stream.$anonfun$$plus$plus$1(Stream.scala:372) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163) at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163) at scala.collection.immutable.Stream.filterImpl(Stream.scala:506) at scala.collection.immutable.Stream$.$anonfun$filteredTail$1(Stream.scala:1260) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1163) at scala.collection.immutable.Stream$.$anonfun$filteredTail$1(Stream.scala:1260) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1173) at scala.colle
[spark] branch master updated (8287a003512 -> 4dbcac95d5a)
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 8287a003512 [MINOR][DOC] Refine doc for `Column.over` add 4dbcac95d5a [SPARK-43535][BUILD] Adjust the ImportOrderChecker rule to resolve long-standing import order issues No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/sql/application/ConnectRepl.scala | 6 -- .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala| 2 +- .../jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +- .../apache/spark/sql/connect/client/SparkConnectClientSuite.scala | 3 ++- .../main/scala/org/apache/spark/sql/connect/common/UdfPacket.scala | 3 ++- .../spark/sql/connect/service/SparkConnectAddArtifactsHandler.scala | 6 -- .../main/scala/org/apache/spark/metrics/source/JVMCPUSource.scala | 5 +++-- .../test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala | 3 ++- .../src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala | 3 ++- .../org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala | 4 ++-- .../spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala | 5 +++-- .../scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 3 ++- .../test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala| 3 ++- .../cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala | 3 ++- .../scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala | 3 ++- .../scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala | 3 ++- .../spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala | 3 ++- scalastyle-config.xml | 2 +- .../org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala | 3 ++- 19 files changed, 41 insertions(+), 24 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d44e073f0cd -> 8287a003512)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d44e073f0cd [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api add 8287a003512 [MINOR][DOC] Refine doc for `Column.over` No new revisions were added by this update. Summary of changes: python/pyspark/sql/column.py | 14 ++ 1 file changed, 10 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d44e073f0cd [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api d44e073f0cd is described below commit d44e073f0cdaf16028a4854e79db200a4e39a6fe Author: yangjie01 AuthorDate: Wed May 17 16:27:03 2023 +0900 [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api ### What changes were proposed in this pull request? This pr add support to make `recentProgress` and `lastProgress` in `RemoteStreamingQuery` return `StreamingQueryProgress` instance consistent with the native Scala Api. ### Why are the changes needed? Add Spark connect jvm client api coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new check to `StreamingQuerySuite` Closes #40892 from LuciferYang/SPARK-43128. Lead-authored-by: yangjie01 Co-authored-by: YangJie Co-authored-by: Wenchen Fan Co-authored-by: Ruifeng Zheng Co-authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../spark/sql/streaming/StreamingQuery.scala | 4 +- .../org/apache/spark/sql/streaming/progress.scala | 310 - .../CheckConnectJvmClientCompatibility.scala | 5 - .../streaming/StreamingQueryProgressSuite.scala| 227 +++ .../spark/sql/streaming/StreamingQuerySuite.scala | 23 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 5 +- .../org/apache/spark/sql/streaming/progress.scala | 17 ++ .../StreamingQueryStatusAndProgressSuite.scala | 58 8 files changed, 638 insertions(+), 11 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index b57e6239b77..ceb096b9aff 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -205,14 +205,14 @@ class RemoteStreamingQuery( override def recentProgress: Array[StreamingQueryProgress] = { executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala - .map(json => new StreamingQueryProgress(json)) + .map(StreamingQueryProgress.fromJson) .toArray } override def lastProgress: StreamingQueryProgress = { executeQueryCmd( _.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption - .map(json => new StreamingQueryProgress(json)) + .map(StreamingQueryProgress.fromJson) .orNull } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 974bcd64b29..593311efb9c 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -17,6 +17,312 @@ package org.apache.spark.sql.streaming -class StreamingQueryProgress private[sql] (val json: String) { - // TODO(SPARK-43128): (Implement full object by parsing from json). +import java.{util => ju} +import java.lang.{Long => JLong} +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Evolving +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue} +import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS + +/** + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Evolving +class StateOperatorProgress private[spark] ( +val operatorName: String, +val numRowsTotal: Long, +val numRowsUpdated: Long, +val allUpdatesTimeMs: Long, +val numRowsRemoved: Long, +val allRemovalsTimeMs: Long, +val commitTimeMs: Long, +val memoryUsedBytes: Long, +val numRowsDr