[spark] branch master updated: [SPARK-38723][SS][TESTS] Add test for streaming query resume race condition
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7d7afb06f68 [SPARK-38723][SS][TESTS] Add test for streaming query resume race condition 7d7afb06f68 is described below commit 7d7afb06f682c10f3900eb8adeab9fad6d49cb24 Author: Phil Dakin AuthorDate: Thu Oct 26 14:24:09 2023 +0900 [SPARK-38723][SS][TESTS] Add test for streaming query resume race condition ### What changes were proposed in this pull request? Add a test for the CONCURRENT_QUERY error raised when multiple sessions try to simultaneously resume the same streaming query from checkpoint. ### Why are the changes needed? Improve testing coverage per https://issues.apache.org/jira/browse/SPARK-38723. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Change is itself a test - ran locally and confirmed the suite passes. ``` [info] All tests passed. [success] Total time: 129 s (02:09), completed Oct 17, 2023, 2:11:34 PM ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43405 from PhilDakin/pdakin.SPARK-38723. Authored-by: Phil Dakin Signed-off-by: Jungtaek Lim --- .../sql/errors/QueryExecutionErrorsSuite.scala | 48 ++ 1 file changed, 48 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 78bbabb1a3f..fb1d05f2a9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -25,6 +25,7 @@ import java.util.{Locale, Properties, ServiceConfigurationError} import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission import org.mockito.Mockito.{mock, spy, when} +import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, QueryTest, Row, SaveMode} @@ -49,6 +50,7 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} import org.apache.spark.sql.streaming.StreamingQueryException import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DataType, DecimalType, LongType, MetadataBuilder, StructType} +import org.apache.spark.util.ThreadUtils import org.apache.spark.util.Utils class QueryExecutionErrorsSuite @@ -876,6 +878,52 @@ class QueryExecutionErrorsSuite assert(e.getCause.isInstanceOf[NullPointerException]) } + test("CONCURRENT_QUERY: streaming query is resumed from many sessions") { +failAfter(90 seconds) { + withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") { +withTempDir { dir => + val ds = spark.readStream.format("rate").load() + + // Queries have the same ID when they are resumed from the same checkpoint. + val chkLocation = new File(dir, "_checkpoint").getCanonicalPath + val dataLocation = new File(dir, "data").getCanonicalPath + + // Run an initial query to setup the checkpoint. + val initialQuery = ds.writeStream.format("parquet") +.option("checkpointLocation", chkLocation).start(dataLocation) + + // Error is thrown due to a race condition. Ensure it happens with high likelihood in the + // test by spawning many threads. + val exceptions = ThreadUtils.parmap(Seq.range(1, 50), "QueryExecutionErrorsSuite", 50) +{ _ => + var exception = None : Option[SparkConcurrentModificationException] + try { +val restartedQuery = ds.writeStream.format("parquet") + .option("checkpointLocation", chkLocation).start(dataLocation) +restartedQuery.stop() +restartedQuery.awaitTermination() + } catch { +case e: SparkConcurrentModificationException => + exception = Some(e) + } + exception +} + assert(exceptions.map(e => e.isDefined).reduceLeft(_ || _)) + exceptions.map { e => +if (e.isDefined) { + checkError( +e.get, +errorClass = "CONCURRENT_QUERY", +sqlState = Some("0A000") + ) +} + } + spark.streams.active.foreach(_.stop()) +} + } +} + } + test("UNSUPPORTED_EXPR_FOR_WINDOW: to_date is not supported with WINDOW") { withTable("t") { sql("CREATE TABLE t(c String) USING parquet") ---
[spark] branch master updated: [SPARK-45663][CORE][MLLIB] Replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft`
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a3146c83d98 [SPARK-45663][CORE][MLLIB] Replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft` a3146c83d98 is described below commit a3146c83d98fe76aeb6880a40b61fcdd257685ce Author: yangjie01 AuthorDate: Thu Oct 26 13:20:56 2023 +0800 [SPARK-45663][CORE][MLLIB] Replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft` ### What changes were proposed in this pull request? This pr replace `IterableOnceOps#aggregate` with `IterableOnceOps#foldLeft` due to `aggregate` has been marked as deprecated since Scala 2.13.0. ```scala deprecated("`aggregate` is not relevant for sequential collections. Use `foldLeft(z)(seqop)` instead.", "2.13.0") def aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop) ``` ### Why are the changes needed? Clean up deprecated API usage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43527 from LuciferYang/SPARK-45663. Authored-by: yangjie01 Signed-off-by: yangjie01 --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 ++--- .../scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala | 2 +- .../scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala | 5 ++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index c6770c77b92..5dc666c62d1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1219,8 +1219,7 @@ abstract class RDD[T: ClassTag]( // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) -val cleanCombOp = sc.clean(combOp) -val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) +val aggregatePartition = (it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp) val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult @@ -1258,7 +1257,7 @@ abstract class RDD[T: ClassTag]( val cleanSeqOp = context.clean(seqOp) val cleanCombOp = context.clean(combOp) val aggregatePartition = -(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) +(it: Iterator[T]) => it.foldLeft(zeroValue)(cleanSeqOp) var partiallyAggregated: RDD[U] = mapPartitions(it => Iterator(aggregatePartition(it))) var numPartitions = partiallyAggregated.partitions.length val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) diff --git a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala index ce46fc8f201..f08cf44e4e1 100644 --- a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala @@ -69,7 +69,7 @@ private[spark] object StratifiedSamplingUtils extends Logging { val rng = new RandomDataGenerator() rng.reSeed(seed + partition) val seqOp = getSeqOp(withReplacement, fractions, rng, counts) - Iterator(iter.aggregate(zeroU)(seqOp, combOp)) + Iterator(iter.foldLeft(zeroU)(seqOp)) } mappedPartitionRDD.reduce(combOp) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala index cbe2776f664..2b86c7cd344 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/AreaUnderCurve.scala @@ -78,9 +78,8 @@ private[evaluation] object AreaUnderCurve { * @param curve an iterator over ordered 2D points stored in pairs representing a curve */ def of(curve: Iterable[(Double, Double)]): Double = { -curve.iterator.sliding(2).withPartial(false).aggregate(0.0)( - seqop = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points), - combop = _ + _ +curve.iterator.sliding(2).withPartial(false).foldLeft(0.0)( + op = (auc: Double, points: Seq[(Double, Double)]) => auc + trapezoid(points) ) } } --
[spark] branch master updated: [SPARK-44407][BUILD][TESTS] Clean up the compilation warnings related to `it will become a keyword in Scala 3` and prohibit use these keywords as variable name
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 35c628d9b27 [SPARK-44407][BUILD][TESTS] Clean up the compilation warnings related to `it will become a keyword in Scala 3` and prohibit use these keywords as variable name 35c628d9b27 is described below commit 35c628d9b27aee9263bf43254cd839da69da9f28 Author: yangjie01 AuthorDate: Thu Oct 26 13:18:23 2023 +0800 [SPARK-44407][BUILD][TESTS] Clean up the compilation warnings related to `it will become a keyword in Scala 3` and prohibit use these keywords as variable name ### What changes were proposed in this pull request? This pr clean up the compilation warnings related to `it will become a keyword in Scala 3`, additionally, to facilitate future Scala version migration, a new compiler option has been added to prohibit the use of these keywords as variable names. ### Why are the changes needed? There are some literals, such as `enum`, `given`, `export`, etc., using them as variable names in Scala 2.13 will trigger compilation warnings, but this will become a compilation error in Scala 3. **Scala 2.13** ``` Welcome to Scala 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.8). Type in expressions for evaluation. Or try :help. scala> val enum: Int = 1 ^ warning: Wrap `enum` in backticks to use it as an identifier, it will become a keyword in Scala 3. [quickfixable] val enum: Int = 1 scala> val export: Int = 1 ^ warning: Wrap `export` in backticks to use it as an identifier, it will become a keyword in Scala 3. [quickfixable] val export: Int = 1 scala> val given: Int = 1 ^ warning: Wrap `given` in backticks to use it as an identifier, it will become a keyword in Scala 3. [quickfixable] val given: Int = 1 ``` **Scala 3** ``` Welcome to Scala 3.3.1 (17.0.8, Java OpenJDK 64-Bit Server VM). Type in expressions for evaluation. Or try :help. scala> val enum: Int = 1 -- [E032] Syntax Error: 1 |val enum: Int = 1 | |pattern expected | | longer explanation available when compiling with `-explain` scala> val export: Int = 1 -- [E032] Syntax Error: 1 |val export: Int = 1 |^^ |pattern expected | | longer explanation available when compiling with `-explain` scala> val given: Int = 1 -- [E040] Syntax Error: 1 |val given: Int = 1 | ^ | an identifier expected, but ':' found | | longer explanation available when compiling with `-explain` ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43529 from LuciferYang/SPARK-44407. Authored-by: yangjie01 Signed-off-by: yangjie01 --- pom.xml | 6 ++ project/SparkBuild.scala| 6 +- .../org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala | 4 ++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index e3f3b2fe9a1..6488918326f 100644 --- a/pom.xml +++ b/pom.xml @@ -3008,6 +3008,12 @@ SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3. --> -Wconf:cat=deprecation&msg=symbol literal is deprecated:e + + -Wconf:cat=deprecation&msg=it will become a keyword in Scala 3:e -Xss128m diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6e87cab6df8..098a628ba1c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -265,7 +265,11 @@ object SparkBuild extends PomBuild { // Or use `-Wconf:msg=legacy-binding:s` to silence this warning. [quickfixable]" "-Wconf:msg=legacy-binding:s", // SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3. -"-Wconf:cat=deprecation&msg=symbol literal is deprecated:e" +"-Wconf:cat=deprecation&msg=symbol literal is deprecated:e", +// SPARK-45627 `enum`, `export` and `given` will become keywords in Scala 3, +// so they are prohibited from being used as variable names in Scala 2.13 to +// reduce the cost of migration in subsequent
[spark] branch master updated: [SPARK-45665][INFRA] Uses different `ORACLE_DOCKER_IMAGE_NAME` in the scheduled builds in other branches
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8cdcfd262f9 [SPARK-45665][INFRA] Uses different `ORACLE_DOCKER_IMAGE_NAME` in the scheduled builds in other branches 8cdcfd262f9 is described below commit 8cdcfd262f9fd46fb9a8e1ceb0bccefe452582bd Author: yangjie01 AuthorDate: Thu Oct 26 11:22:41 2023 +0800 [SPARK-45665][INFRA] Uses different `ORACLE_DOCKER_IMAGE_NAME` in the scheduled builds in other branches ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/43123 upgraded the version of Oracle used for testing. As the daily test will reuse `build_and_test.yml`, the branch-3.x will also use the new version. However, due to the lack of synchronized code changes, the `OracleIntegrationSuite` in the `Docker integration tests` cannot pass the test during the daily test of branch-3.x: - branch-3.3: https://github.com/apache/spark/actions/runs/6609791712/job/17950549755 - branch-3.4: https://github.com/apache/spark/actions/runs/6611049884/job/17954225189 - branch-3.5: https://github.com/apache/spark/actions/runs/6612344747/job/17958021656 So this PR adds the ORACLE_DOCKER_IMAGE_NAME environment variable to the daily test yml file for branch-3.x and uses the previous version of the Oracle Docker image that can pass the test successfully. ### Why are the changes needed? Restore the daily test for branch-3.x. ### Does this PR introduce _any_ user-facing change? No,dev-only ### How was this patch tested? Monitor the daily test results. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43496 from LuciferYang/oracl-docker-image-name. Lead-authored-by: yangjie01 Co-authored-by: YangJie Signed-off-by: yangjie01 --- .github/workflows/build_and_test.yml | 1 + .github/workflows/build_branch33.yml | 3 ++- .github/workflows/build_branch34.yml | 3 ++- .github/workflows/build_branch35.yml | 3 ++- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 13e751bcaa7..5825185f344 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -982,6 +982,7 @@ jobs: distribution: zulu java-version: ${{ inputs.java }} - name: Run tests + env: ${{ fromJSON(inputs.envs) }} run: | ./dev/run-tests --parallelism 1 --modules docker-integration-tests --included-tags org.apache.spark.tags.DockerTest - name: Upload test results to report diff --git a/.github/workflows/build_branch33.yml b/.github/workflows/build_branch33.yml index 63a950447f5..fc6ce7028fc 100644 --- a/.github/workflows/build_branch33.yml +++ b/.github/workflows/build_branch33.yml @@ -37,7 +37,8 @@ jobs: envs: >- { "SCALA_PROFILE": "scala2.13", - "PYTHON_TO_TEST": "" + "PYTHON_TO_TEST": "", + "ORACLE_DOCKER_IMAGE_NAME": "gvenzl/oracle-xe:21.3.0" } jobs: >- { diff --git a/.github/workflows/build_branch34.yml b/.github/workflows/build_branch34.yml index 740b68b69e3..deb43d82c97 100644 --- a/.github/workflows/build_branch34.yml +++ b/.github/workflows/build_branch34.yml @@ -37,7 +37,8 @@ jobs: envs: >- { "SCALA_PROFILE": "scala2.13", - "PYTHON_TO_TEST": "" + "PYTHON_TO_TEST": "", + "ORACLE_DOCKER_IMAGE_NAME": "gvenzl/oracle-xe:21.3.0" } jobs: >- { diff --git a/.github/workflows/build_branch35.yml b/.github/workflows/build_branch35.yml index 15ee66a9ce4..9e6fe13c020 100644 --- a/.github/workflows/build_branch35.yml +++ b/.github/workflows/build_branch35.yml @@ -37,7 +37,8 @@ jobs: envs: >- { "SCALA_PROFILE": "scala2.13", - "PYTHON_TO_TEST": "" + "PYTHON_TO_TEST": "", + "ORACLE_DOCKER_IMAGE_NAME": "gvenzl/oracle-xe:21.3.0" } jobs: >- { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (dcbe275543e -> 25c74d0d4e2)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from dcbe275543e [SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from Pandas API on Spark add 25c74d0d4e2 [SPARK-45635][PYTHON][TESTS] Cleanup unused import for PySpark testing No new revisions were added by this update. Summary of changes: python/pyspark/pandas/tests/computation/test_corrwith.py | 1 - python/pyspark/pandas/tests/data_type_ops/test_num_arithmetic.py | 1 - python/pyspark/pandas/tests/data_type_ops/testing_utils.py | 1 - python/pyspark/pandas/tests/frame/test_constructor.py| 1 - python/pyspark/pandas/tests/groupby/test_groupby.py | 2 +- python/pyspark/pandas/tests/indexes/test_base_slow.py| 2 +- python/pyspark/pandas/tests/indexes/test_datetime_property.py| 1 - python/pyspark/pandas/tests/series/test_as_type.py | 1 - python/pyspark/pandas/tests/series/test_cumulative.py| 1 - python/pyspark/pandas/tests/series/test_series.py| 9 + python/pyspark/pandas/tests/test_dataframe_spark_io.py | 2 -- python/pyspark/pandas/tests/test_frame_resample.py | 7 --- python/pyspark/pandas/tests/test_ops_on_diff_frames.py | 2 -- python/pyspark/pandas/tests/test_resample.py | 2 +- python/pyspark/pandas/tests/test_series_resample.py | 7 --- python/pyspark/pandas/tests/test_spark_functions.py | 5 - python/pyspark/testing/connectutils.py | 1 - python/pyspark/testing/utils.py | 2 +- 18 files changed, 5 insertions(+), 43 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from Pandas API on Spark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dcbe275543e [SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from Pandas API on Spark dcbe275543e is described below commit dcbe275543e05cb4529317ddb933d09253d65d6f Author: Haejoon Lee AuthorDate: Thu Oct 26 11:16:36 2023 +0900 [SPARK-45634][PS] Remove `DataFrame.get_dtype_counts` from Pandas API on Spark ### What changes were proposed in this pull request? This PR proposes to remove old API `get_dtype_counts` from Pandas API on Spark ### Why are the changes needed? This API was deprecated a long time ago, but has not been removed since it's internally used in our code base. But it's no longer used in anywhere currently. ### Does this PR introduce _any_ user-facing change? `DataFrame.get_dtype_counts` is removed. ### How was this patch tested? No new test is required for API removal. The existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43488 from itholic/SPARK-45634. Authored-by: Haejoon Lee Signed-off-by: Hyukjin Kwon --- .../source/migration_guide/pyspark_upgrade.rst | 1 + python/pyspark/pandas/generic.py | 51 -- 2 files changed, 1 insertion(+), 51 deletions(-) diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst b/python/docs/source/migration_guide/pyspark_upgrade.rst index 933fa936f70..20fab578504 100644 --- a/python/docs/source/migration_guide/pyspark_upgrade.rst +++ b/python/docs/source/migration_guide/pyspark_upgrade.rst @@ -53,6 +53,7 @@ Upgrading from PySpark 3.5 to 4.0 * In Spark 4.0, ``col_space`` parameter from ``DataFrame.to_latex`` and ``Series.to_latex`` has been removed from pandas API on Spark. * In Spark 4.0, ``DataFrame.to_spark_io`` has been removed from pandas API on Spark, use ``DataFrame.spark.to_spark_io`` instead. * In Spark 4.0, ``Series.is_monotonic`` and ``Index.is_monotonic`` have been removed from pandas API on Spark, use ``Series.is_monotonic_increasing`` or ``Index.is_monotonic_increasing`` instead respectively. +* In Spark 4.0, ``DataFrame.get_dtype_counts`` has been removed from pandas API on Spark, use ``DataFrame.dtypes.value_counts()`` instead. Upgrading from PySpark 3.3 to 3.4 diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index c6f1b9ccbb7..16eaeb6142e 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -19,13 +19,11 @@ A base class of DataFrame/Column to behave like pandas DataFrame/Series. """ from abc import ABCMeta, abstractmethod -from collections import Counter from functools import reduce from typing import ( Any, Callable, Dict, -Iterable, IO, List, Optional, @@ -400,55 +398,6 @@ class Frame(object, metaclass=ABCMeta): """ return self._apply_series_op(lambda psser: psser._cumprod(skipna), should_resolve=True) -# TODO: Although this has removed pandas >= 1.0.0, but we're keeping this as deprecated -# since we're using this for `DataFrame.info` internally. -# We can drop it once our minimal pandas version becomes 1.0.0. -def get_dtype_counts(self) -> pd.Series: -""" -Return counts of unique dtypes in this object. - -.. deprecated:: 0.14.0 - -Returns ---- -dtype: pd.Series -Series with the count of columns with each dtype. - -See Also - -dtypes: Return the dtypes in this object. - -Examples - ->>> a = [['a', 1, 1], ['b', 2, 2], ['c', 3, 3]] ->>> df = ps.DataFrame(a, columns=['str', 'int1', 'int2']) ->>> df - str int1 int2 -0 a 1 1 -1 b 2 2 -2 c 3 3 - ->>> df.get_dtype_counts().sort_values() -object1 -int64 2 -dtype: int64 - ->>> df.str.get_dtype_counts().sort_values() -object1 -dtype: int64 -""" -warnings.warn( -"`get_dtype_counts` has been deprecated and will be " -"removed in a future version. For DataFrames use " -"`.dtypes.value_counts()", -FutureWarning, -) -if not isinstance(self.dtypes, Iterable): -dtypes = [self.dtypes] -else: -dtypes = list(self.dtypes) -return pd.Series(dict(Counter([d.name for d in dtypes]))) - def pipe(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: r""" Apply func(self, \*args, \*\*kwargs). -
[spark] branch master updated: [SPARK-45651][BUILD] Log memory usage of publish snapshot workflow
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e57a16d7af1 [SPARK-45651][BUILD] Log memory usage of publish snapshot workflow e57a16d7af1 is described below commit e57a16d7af14d1ce0c14d01dd220c63acb98517d Author: Enrico Minack AuthorDate: Thu Oct 26 10:39:32 2023 +0900 [SPARK-45651][BUILD] Log memory usage of publish snapshot workflow ### What changes were proposed in this pull request? This logs memory consumption while publishing snapshots. This is to investigate whether the suspected high memory usage is the root cause of `publish_snapshots` failures for master. Merging this after #43512 allows to run this manually. ### Why are the changes needed? The working assumption is that high memory usage is the root cause. This logging should provide proof or disproof for this assumption. This can be reverted once more is known or SPARK-45651 is fixed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Locally ### Was this patch authored or co-authored using generative AI tooling? No Closes #43513 from EnricoMi/publish-snapshot-log-memory. Authored-by: Enrico Minack Signed-off-by: Hyukjin Kwon --- .github/workflows/publish_snapshot.yml | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/publish_snapshot.yml b/.github/workflows/publish_snapshot.yml index 6d55f1afed0..3354ab88a39 100644 --- a/.github/workflows/publish_snapshot.yml +++ b/.github/workflows/publish_snapshot.yml @@ -70,4 +70,12 @@ jobs: GPG_KEY: "not_used" GPG_PASSPHRASE: "not_used" GIT_REF: ${{ matrix.branch }} - run: ./dev/create-release/release-build.sh publish-snapshot + run: | +while true +do + date + top -b -n 1 -i + sleep 1 + echo +done | sed "s/^/mem: /" & +./dev/create-release/release-build.sh publish-snapshot - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45136][CONNECT] Enhance ClosureCleaner with Ammonite support
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3e2b146eb81 [SPARK-45136][CONNECT] Enhance ClosureCleaner with Ammonite support 3e2b146eb81 is described below commit 3e2b146eb81d9a5727f07b58f7bb1760a71a8697 Author: Vsevolod Stepanov AuthorDate: Wed Oct 25 21:35:07 2023 -0400 [SPARK-45136][CONNECT] Enhance ClosureCleaner with Ammonite support ### What changes were proposed in this pull request? This PR enhances existing ClosureCleaner implementation to support cleaning closures defined in Ammonite. Please refer to [this gist](https://gist.github.com/vsevolodstep-db/b8e4d676745d6e2d047ecac291e5254c) to get more context on how Ammonite code wrapping works and what problems I'm trying to solve here. Overall, it contains these logical changes in `ClosureCleaner`: 1. Making it recognize and clean closures defined in Ammonite (previously it was checking if capturing class name starts with `$line` and ends with `$iw`, which is native Scala REPL specific thing 2. Making it clean closures if they are defined inside a user class in a REPL (see corner case 1 in the gist) 3. Making it clean nested closures properly for Ammonite REPL (see corner case 2 in the gist) 4. Making it transitively follow other Ammonite commands that are captured by the target closure. Please note that `cleanTransitively` option of `ClosureCleaner.clean()` method refers to following references transitively within enclosing command object, but it doesn't follow other command objects. As we need `ClosureCleaner` to be available in Spark Connect, I also moved the implementation to `common-utils` module. This brings a new `xbean-asm9-shaded` which is fairly small. Also, this PR moves `checkSerializable` check from `ClosureCleaner` to `SparkClosureCleaner`, as it is specific to Spark core The important changes affect `ClosureCleaner` only. They should not affect existing codepath for normal Scala closures / closures defined in a native Scala REPL and cover only closures defined in Ammonite. Also, this PR modifies SparkConnect's `UserDefinedFunction` to actually use `ClosureCleaner` and clean closures in SparkConnect ### Why are the changes needed? To properly support closures defined in Ammonite, reduce UDF payload size and avoid possible `NonSerializable` exceptions. This includes: - lambda capturing outer command object, leading in a circular dependency - lambda capturing other command objects transitively, exploding payload size ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. New tests in `ReplE2ESuite` covering various scenarios using SparkConnect + Ammonite REPL to make sure closures are actually cleaned. ### Was this patch authored or co-authored using generative AI tooling? No Closes #42995 from vsevolodstep-db/SPARK-45136/closure-cleaner. Authored-by: Vsevolod Stepanov Signed-off-by: Herman van Hovell --- common/utils/pom.xml | 4 + .../org/apache/spark/util/ClosureCleaner.scala | 636 ++--- .../org/apache/spark/util/SparkStreamUtils.scala | 109 .../sql/expressions/UserDefinedFunction.scala | 10 +- .../spark/sql/application/ReplE2ESuite.scala | 143 + .../CheckConnectJvmClientCompatibility.scala | 8 + core/pom.xml | 4 - .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/util/SparkClosureCleaner.scala| 49 ++ .../main/scala/org/apache/spark/util/Utils.scala | 85 +-- .../apache/spark/util/ClosureCleanerSuite.scala| 2 +- .../apache/spark/util/ClosureCleanerSuite2.scala | 4 +- project/MimaExcludes.scala | 4 +- .../catalyst/encoders/ExpressionEncoderSuite.scala | 4 +- .../org/apache/spark/streaming/StateSpec.scala | 6 +- 15 files changed, 756 insertions(+), 314 deletions(-) diff --git a/common/utils/pom.xml b/common/utils/pom.xml index 37d1ea48d97..44cb30a19ff 100644 --- a/common/utils/pom.xml +++ b/common/utils/pom.xml @@ -39,6 +39,10 @@ org.apache.spark spark-tags_${scala.binary.version} + + org.apache.xbean + xbean-asm9-shaded + com.fasterxml.jackson.core jackson-databind diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala similarity index 61% rename from core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala rename to common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 29fb0206f90..ffa2f0e60b2 1
[spark] branch master updated: [SPARK-45661][SQL][PYTHON] Add toNullable in StructType, MapType and ArrayType
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0219eb5984f [SPARK-45661][SQL][PYTHON] Add toNullable in StructType, MapType and ArrayType 0219eb5984f is described below commit 0219eb5984f0f4a7209deb091b713ded10aebba3 Author: Hyukjin Kwon AuthorDate: Thu Oct 26 09:30:59 2023 +0900 [SPARK-45661][SQL][PYTHON] Add toNullable in StructType, MapType and ArrayType ### What changes were proposed in this pull request? This PR proposes to add: - `StructType.toNullable` - `MapType.toNullable` - `ArrayType.toNullable` that returns a nullable schema. ### Why are the changes needed? See https://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe as an example. ### Does this PR introduce _any_ user-facing change? Yes, it adds new API in both Scala and Python: - `StructType.toNullable` - `MapType.toNullable` - `ArrayType.toNullable` ### How was this patch tested? For Scala, it just adds an alias. For Python side, doctests were added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43523 from HyukjinKwon/SPARK-45661. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/types.py| 124 + .../org/apache/spark/sql/types/ArrayType.scala | 8 ++ .../scala/org/apache/spark/sql/types/MapType.scala | 8 ++ .../org/apache/spark/sql/types/StructType.scala| 8 ++ 4 files changed, 148 insertions(+) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 01db75b2500..d6862d7178a 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -139,6 +139,9 @@ class DataType: """ return obj +def _as_nullable(self) -> "DataType": +return self + @classmethod def fromDDL(cls, ddl: str) -> "DataType": """ @@ -593,6 +596,41 @@ class ArrayType(DataType): def simpleString(self) -> str: return "array<%s>" % self.elementType.simpleString() +def _as_nullable(self) -> "ArrayType": +return ArrayType(self.elementType._as_nullable(), containsNull=True) + +def toNullable(self) -> "ArrayType": +""" +Returns the same data type but set all nullability fields are true +(`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`). + +.. versionadded:: 4.0.0 + +Returns +--- +:class:`ArrayType` + +Examples + +Example 1: Simple nullability conversion + +>>> ArrayType(IntegerType(), containsNull=False).toNullable() +ArrayType(IntegerType(), True) + +Example 2: Nested nullability conversion + +>>> ArrayType( +... StructType([ +... StructField("b", IntegerType(), nullable=False), +... StructField("c", ArrayType(IntegerType(), containsNull=False)) +... ]), +... containsNull=False +... ).toNullable() +ArrayType(StructType([StructField('b', IntegerType(), True), +StructField('c', ArrayType(IntegerType(), True), True)]), True) +""" +return self._as_nullable() + def __repr__(self) -> str: return "ArrayType(%s, %s)" % (self.elementType, str(self.containsNull)) @@ -671,6 +709,44 @@ class MapType(DataType): def simpleString(self) -> str: return "map<%s,%s>" % (self.keyType.simpleString(), self.valueType.simpleString()) +def _as_nullable(self) -> "MapType": +return MapType( +self.keyType._as_nullable(), self.valueType._as_nullable(), valueContainsNull=True +) + +def toNullable(self) -> "MapType": +""" +Returns the same data type but set all nullability fields are true +(`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`). + +.. versionadded:: 4.0.0 + +Returns +--- +:class:`MapType` + +Examples + +Example 1: Simple nullability conversion + +>>> MapType(IntegerType(), StringType(), valueContainsNull=False).toNullable() +MapType(IntegerType(), StringType(), True) + +Example 2: Nested nullability conversion + +>>> MapType( +... StringType(), +... MapType( +... IntegerType(), +... ArrayType(IntegerType(), containsNull=False), +... valueContainsNull=False +... ), +... valueContainsNull=False +... ).toNullable() +MapType(StringType(), MapType(IntegerType(),
[spark-connect-go] branch dependabot/go_modules/google.golang.org/grpc-1.56.3 created (now bcf0c58)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to branch dependabot/go_modules/google.golang.org/grpc-1.56.3 in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git at bcf0c58 Bump google.golang.org/grpc from 1.54.0 to 1.56.3 No new revisions were added by this update. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45660] Re-use Literal objects in ComputeCurrentTime rule
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 344453761cb [SPARK-45660] Re-use Literal objects in ComputeCurrentTime rule 344453761cb is described below commit 344453761cbca154a04a53d4c5d6c2b1eef59652 Author: Ole Sasse AuthorDate: Wed Oct 25 19:56:15 2023 +0300 [SPARK-45660] Re-use Literal objects in ComputeCurrentTime rule ### What changes were proposed in this pull request? The ComputeCurrentTime optimizer rule does produce unique timestamp Literals for current time expressions of a query. For CurrentDate and LocalTimestamp the Literal objects are not re-used though, but semantically equal objects are created for each instance. This can cost unnecessary much memory in case there are many such Literal objects. This PR adds a map that caches timestamp literals in case they are used more than once. ### Why are the changes needed? A query that has a lot of equal literals could use unnecessary high amounts of memory ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new Unit Test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43524 from olaky/unique-timestamp-replacement-literals. Authored-by: Ole Sasse Signed-off-by: Max Gekk --- .../sql/catalyst/optimizer/finishAnalysis.scala| 15 --- .../optimizer/ComputeCurrentTimeSuite.scala| 30 +- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 4052ccd6496..18c85999312 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import java.time.{Instant, LocalDateTime} +import java.time.{Instant, LocalDateTime, ZoneId} import org.apache.spark.sql.catalyst.CurrentUserContext import org.apache.spark.sql.catalyst.expressions._ @@ -79,6 +79,8 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { val currentTimestampMicros = instantToMicros(instant) val currentTime = Literal.create(currentTimestampMicros, TimestampType) val timezone = Literal.create(conf.sessionLocalTimeZone, StringType) +val currentDates = collection.mutable.HashMap.empty[ZoneId, Literal] +val localTimestamps = collection.mutable.HashMap.empty[ZoneId, Literal] def transformCondition(treePatternbits: TreePatternBits): Boolean = { treePatternbits.containsPattern(CURRENT_LIKE) @@ -88,12 +90,17 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { case subQuery => subQuery.transformAllExpressionsWithPruning(transformCondition) { case cd: CurrentDate => -Literal.create(DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), DateType) +currentDates.getOrElseUpdate(cd.zoneId, { + Literal.create( +DateTimeUtils.microsToDays(currentTimestampMicros, cd.zoneId), DateType) +}) case CurrentTimestamp() | Now() => currentTime case CurrentTimeZone() => timezone case localTimestamp: LocalTimestamp => -val asDateTime = LocalDateTime.ofInstant(instant, localTimestamp.zoneId) -Literal.create(localDateTimeToMicros(asDateTime), TimestampNTZType) +localTimestamps.getOrElseUpdate(localTimestamp.zoneId, { + val asDateTime = LocalDateTime.ofInstant(instant, localTimestamp.zoneId) + Literal.create(localDateTimeToMicros(asDateTime), TimestampNTZType) +}) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 8b76cc383c5..447d77855fb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -23,7 +23,7 @@ import scala.concurrent.duration._ import scala.jdk.CollectionConverters.MapHasAsScala import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, CurrentTimestamp, CurrentTimeZone, InSubquery, ListQuery, Literal, LocalTimestamp, Now} +import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentDate, Curre
[spark] branch branch-3.4 updated: [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new ecdb69f3db3 [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring ecdb69f3db3 is described below commit ecdb69f3db3370aa7cf6ae8a52130379e465ca73 Author: Paul Staab AuthorDate: Wed Oct 25 07:36:15 2023 -0500 [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring ### What changes were proposed in this pull request? Corrects the docstring `DataFrame.cache` to give the correct storage level after it changed with Spark 3.0. It seems that the docstring of `DataFrame.persist` was updated, but `cache` was forgotten. ### Why are the changes needed? The doctoring claims that `cache` uses serialised storage, but it actually uses deserialised storage. I confirmed that this is still the case with Spark 3.5.0 using the example code from the Jira ticket. ### Does this PR introduce _any_ user-facing change? Yes, the docstring changes. ### How was this patch tested? The Github actions workflow succeeded. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43229 from paulstaab/SPARK-40154. Authored-by: Paul Staab Signed-off-by: Sean Owen (cherry picked from commit 94607dd001b133a25dc9865f25b3f9e7f5a5daa3) Signed-off-by: Sean Owen --- python/pyspark/sql/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 518bc9867d7..14426c51439 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1404,7 +1404,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): self.rdd.foreachPartition(f) # type: ignore[arg-type] def cache(self) -> "DataFrame": -"""Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`). +"""Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK_DESER`). .. versionadded:: 1.3.0 @@ -1413,7 +1413,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Notes - -The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0. +The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0. Returns --- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 9e4411e2450 [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring 9e4411e2450 is described below commit 9e4411e2450d0503933626207b5e03308c30bc72 Author: Paul Staab AuthorDate: Wed Oct 25 07:36:15 2023 -0500 [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring ### What changes were proposed in this pull request? Corrects the docstring `DataFrame.cache` to give the correct storage level after it changed with Spark 3.0. It seems that the docstring of `DataFrame.persist` was updated, but `cache` was forgotten. ### Why are the changes needed? The doctoring claims that `cache` uses serialised storage, but it actually uses deserialised storage. I confirmed that this is still the case with Spark 3.5.0 using the example code from the Jira ticket. ### Does this PR introduce _any_ user-facing change? Yes, the docstring changes. ### How was this patch tested? The Github actions workflow succeeded. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43229 from paulstaab/SPARK-40154. Authored-by: Paul Staab Signed-off-by: Sean Owen (cherry picked from commit 94607dd001b133a25dc9865f25b3f9e7f5a5daa3) Signed-off-by: Sean Owen --- python/pyspark/sql/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 30ed73d3c47..5707ae2a31f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1485,7 +1485,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): self.rdd.foreachPartition(f) # type: ignore[arg-type] def cache(self) -> "DataFrame": -"""Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`). +"""Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK_DESER`). .. versionadded:: 1.3.0 @@ -1494,7 +1494,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Notes - -The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0. +The default storage level has changed to `MEMORY_AND_DISK_DESER` to match Scala in 3.0. Returns --- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a073bf38c7d -> 94607dd001b)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from a073bf38c7d [SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page add 94607dd001b [SPARK-40154][PYTHON][DOCS] Correct storage level in Dataframe.cache docstring No new revisions were added by this update. Summary of changes: python/pyspark/sql/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a073bf38c7d [SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page a073bf38c7d is described below commit a073bf38c7d8802e2ab12c54299e1541a48a394e Author: Kent Yao AuthorDate: Wed Oct 25 18:29:43 2023 +0800 [SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page ### What changes were proposed in this pull request? This PR draws a CPU Flame Graph by Java stack traces for executors and drivers. Currently, the Java stack traces is just a SNAPSHOT, not sampling at a certain frequency for a period. Sampling might be considered an upcoming feature out of the scope of this PR. ![fg git](https://github.com/apache/spark/assets/8326978/c3f99a1a-78ee-4adb-be1f-e4afd5f307b7) If you are new to flame graphs, there are also some references you can refer to learn about the basic concepts and details. [1] [Flame Graphs](https://www.brendangregg.com/flamegraphs.html) [2] [FLIP-165: Operator's Flame Graphs](https://cwiki.apache.org/confluence/display/FLINK/FLIP-165%3A+Operator%27s+Flame+Graphs) [3] [Java in Flames. mixed-mode flame graphs provide a… | by Netflix Technology Blog](https://netflixtechblog.com/java-in-flames-e763b3d32166) [4] [HProf](https://docs.oracle.com/javase/7/docs/technotes/samples/hprof.html) Pending features This PR mainly focuses on the UI, independent of the profiling steps. What we might have in the future are: - Flame Graph Support For Task Thread Page which SPARK-45151 added - Add `ProfilingExecutor(max, interval)` message to profile whole executor - Add `ProfileTask(taskId, max, interval)` message to profile an certain task - Different views for on/off/full CPUs - Mixed mode profiling, which might rely upon some ext libs at runtime - And so on. ### Why are the changes needed? Performance is always an important design factor in Spark. It is desirable to provide better visibility into the distribution of CPU resources while executing user code alongside the Spark kernel. One of the most visually effective means to do that is [Flame Graphs](http://www.brendangregg.com/FlameGraphs/cpuflamegraphs.html), which visually presents the data gathered by performance profiling tools used by developers for performance tuning their applications. ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? locally ### Was this patch authored or co-authored using generative AI tooling? no Closes #42988 from yaooqinn/SPARK-45209. Authored-by: Kent Yao Signed-off-by: yangjie01 --- LICENSE| 3 +- LICENSE-binary | 3 +- .../org/apache/spark/ui/static/d3-flamegraph.css | 47 .../apache/spark/ui/static/d3-flamegraph.min.js| 2 + .../org/apache/spark/ui/static/flamegraph.js | 36 .../spark/ui/exec/ExecutorThreadDumpPage.scala | 20 + .../spark/ui/flamegraph/FlamegraphNode.scala | 50 ++ dev/.rat-excludes | 2 + 8 files changed, 161 insertions(+), 2 deletions(-) diff --git a/LICENSE b/LICENSE index 44983fd1259..3216134fd4b 100644 --- a/LICENSE +++ b/LICENSE @@ -216,7 +216,8 @@ core/src/main/resources/org/apache/spark/ui/static/bootstrap* core/src/main/resources/org/apache/spark/ui/static/vis* docs/js/vendor/bootstrap.js connector/spark-ganglia-lgpl/src/main/java/com/codahale/metrics/ganglia/GangliaReporter.java - +core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.min.js +core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css Python Software Foundation License -- diff --git a/LICENSE-binary b/LICENSE-binary index 30fca96a883..c6f291f1108 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -413,7 +413,8 @@ core/src/main/java/org/apache/spark/util/collection/TimSort.java core/src/main/resources/org/apache/spark/ui/static/bootstrap* core/src/main/resources/org/apache/spark/ui/static/vis* docs/js/vendor/bootstrap.js - +core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.min.js +core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css This product bundles various third-party components under other open source licenses. diff --git a/core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css b/core/src/main/resources/org/apache/spark/ui/static/d3-flamegraph.css new file mode 100644 index 000
[spark] branch master updated: [SPARK-45656][SQL] Fix observation when named observations with the same name on different datasets
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7db9b2293fa [SPARK-45656][SQL] Fix observation when named observations with the same name on different datasets 7db9b2293fa is described below commit 7db9b2293fa778073274d235dd72212b75d94073 Author: Takuya UESHIN AuthorDate: Wed Oct 25 16:59:26 2023 +0900 [SPARK-45656][SQL] Fix observation when named observations with the same name on different datasets ### What changes were proposed in this pull request? Fixes observation when named observations with the same name on different datasets. ### Why are the changes needed? Currently if there are observations with the same name on different dataset, one of them will be overwritten by the other execution. For example, ```py >>> observation1 = Observation("named") >>> df1 = spark.range(50) >>> observed_df1 = df1.observe(observation1, count(lit(1)).alias("cnt")) >>> >>> observation2 = Observation("named") >>> df2 = spark.range(100) >>> observed_df2 = df2.observe(observation2, count(lit(1)).alias("cnt")) >>> >>> observed_df1.collect() ... >>> observed_df2.collect() ... >>> observation1.get {'cnt': 50} >>> observation2.get {'cnt': 50} ``` `observation2` should return `{'cnt': 100}`. ### Does this PR introduce _any_ user-facing change? Yes, the observations with the same name will be available if they observe different datasets. ### How was this patch tested? Added the related tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43519 from ueshin/issues/SPARK-45656/observation. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/test_dataframe.py | 18 ++ .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../scala/org/apache/spark/sql/Observation.scala| 21 + .../scala/org/apache/spark/sql/DatasetSuite.scala | 21 + 4 files changed, 53 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 3c493a8ae3a..0a2e3a53946 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -1023,6 +1023,24 @@ class DataFrameTestsMixin: self.assertGreaterEqual(row.cnt, 0) self.assertGreaterEqual(row.sum, 0) +def test_observe_with_same_name_on_different_dataframe(self): +# SPARK-45656: named observations with the same name on different datasets +from pyspark.sql import Observation + +observation1 = Observation("named") +df1 = self.spark.range(50) +observed_df1 = df1.observe(observation1, count(lit(1)).alias("cnt")) + +observation2 = Observation("named") +df2 = self.spark.range(100) +observed_df2 = df2.observe(observation2, count(lit(1)).alias("cnt")) + +observed_df1.collect() +observed_df2.collect() + +self.assertEqual(observation1.get, dict(cnt=50)) +self.assertEqual(observation2.get, dict(cnt=100)) + def test_sample(self): with self.assertRaises(PySparkTypeError) as pe: self.spark.range(1).sample() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5079cfcca9d..4f07133bb76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -201,7 +201,7 @@ class Dataset[T] private[sql]( } // A globally unique id of this Dataset. - private val id = Dataset.curId.getAndIncrement() + private[sql] val id = Dataset.curId.getAndIncrement() queryExecution.assertAnalyzed() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala index ba40336fc14..14c4983794b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Observation.scala @@ -21,6 +21,7 @@ import java.util.UUID import scala.jdk.CollectionConverters.MapHasAsJava +import org.apache.spark.sql.catalyst.plans.logical.CollectMetrics import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener @@ -56,7 +57,7 @@ class Observation(val name: String) { private val listener: ObservationListener = ObservationListener(this) - @volatile private var sparkSession: Option[SparkSession] = None + @volatile private var ds: Option[Dataset[_]] = None @volatile private