(spark) branch branch-3.4 updated: [SPARK-48105][SS][3.5] Fix the race condition between state store unloading and snapshotting
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 090022d475d6 [SPARK-48105][SS][3.5] Fix the race condition between state store unloading and snapshotting 090022d475d6 is described below commit 090022d475d671ee345f22eb661f644b29ca28c5 Author: Huanli Wang AuthorDate: Wed May 15 14:52:04 2024 +0900 [SPARK-48105][SS][3.5] Fix the race condition between state store unloading and snapshotting * When we close the hdfs state store, we should only remove the entry from `loadedMaps` rather than doing the active data cleanup. JVM GC should be able to help us GC those objects. * we should wait for the maintenance thread to stop before unloading the providers. There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption. Case 1: 1. the maintenance thread pool encounters some issues and call the [stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774) this function further calls [threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587) However, this function doesn't wait for th [...] 2. the provider unload will [close the state store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721) which [clear the values of loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355) for HDFS backed state store. 3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data. Case 2: 1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the loadedMaps, after this, the maintenance thread [releases the lock of the loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751). 2. state_store_1 is loaded in another executor, e.g. executor_2. 3. another state store, state_store_2, is loaded on executor_1 and [reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871) to driver. 4. executor_1 does the [unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713) for those no longer active state store which clears the data entries in the `HDFSBackedStateStoreMap` 5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the [iterator doesn't have next element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634) after doing the clear. 6. future batches are consuming the corrupted data. No ``` [info] Run completed in 2 minutes, 55 seconds. [info] Total number of tests run: 153 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM ``` before this change ``` [info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds) [info] Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414) [info] Analysis: [info] Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> ) [info] org.scalatest.exceptions.TestFailedException: [info] at
(spark) branch branch-3.5 updated (c1dd4a5df693 -> 1a454287c01e)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git from c1dd4a5df693 [SPARK-48297][SQL] Fix a regression TRANSFORM clause with char/varchar add 1a454287c01e [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError No new revisions were added by this update. Summary of changes: .../apache/spark/sql/errors/QueryParsingErrors.scala | 2 +- .../spark/sql/errors/QueryParsingErrorsSuite.scala| 19 +++ 2 files changed, 20 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 51623785c38c [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests 51623785c38c is described below commit 51623785c38c9b17a6d91cb8e7f686459bd4803e Author: Xi Lyu AuthorDate: Fri May 17 22:06:35 2024 +0200 [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests ### What changes were proposed in this pull request? In [this previous PR](https://github.com/apache/spark/pull/46012), we introduced two new confs for the introduced plan cache - a static conf `spark.connect.session.planCache.maxSize` and a dynamic conf `spark.connect.session.planCache.enabled`. The plan cache is enabled by default with size 5. In this PR, we are marking them as internal because we don't expect users to deal with it. ### Why are the changes needed? These two confs are not expected to be used under normal circumstances, and we don't need to document them on the Spark Configuration reference page. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46638 from xi-db/SPARK-47818-plan-cache-followup2. Authored-by: Xi Lyu Signed-off-by: Herman van Hovell --- .../src/main/scala/org/apache/spark/sql/connect/config/Connect.scala| 2 ++ 1 file changed, 2 insertions(+) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index f9d604712420..91f2e23a8b64 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -279,6 +279,7 @@ object Connect { .doc("Sets the maximum number of cached resolved logical plans in Spark Connect Session." + " If set to a value less or equal than zero will disable the plan cache.") .version("4.0.0") + .internal() .intConf .createWithDefault(5) @@ -289,6 +290,7 @@ object Connect { s" When false, the cache is disabled even if '${CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" + " greater than zero. The caching is best-effort and not guaranteed.") .version("4.0.0") + .internal() .booleanConf .createWithDefault(true) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (15fb4787354a -> 3edd6c7e1d50)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 15fb4787354a [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl add 3edd6c7e1d50 [SPARK-48312][SQL] Improve Alias.removeNonInheritableMetadata performance No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/sql/types/Metadata.scala | 7 +++ .../spark/sql/catalyst/expressions/namedExpressions.scala | 14 +++--- 2 files changed, 18 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 15fb4787354a [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl 15fb4787354a is described below commit 15fb4787354a2d5dc97afb31010beb1f3cc3b73d Author: Ruifeng Zheng AuthorDate: Fri May 17 18:06:25 2024 +0800 [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl ### What changes were proposed in this pull request? Avoid using deprecated methods in dsl ### Why are the changes needed? `putAllRenameColumnsMap` was deprecated ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #46635 from zhengruifeng/with_col_rename_dsl. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../src/test/scala/org/apache/spark/sql/connect/dsl/package.scala | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala index da9a0865b8ca..b50c04e7540f 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -1019,7 +1019,13 @@ package object dsl { WithColumnsRenamed .newBuilder() .setInput(logicalPlan) - .putAllRenameColumnsMap(renameColumnsMap.asJava)) + .addAllRenames(renameColumnsMap.toSeq.map { case (k, v) => +WithColumnsRenamed.Rename + .newBuilder() + .setColName(k) + .setNewColName(v) + .build() + }.asJava)) .build() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48319][PYTHON][CONNECT][TESTS] Test `assert_true` and `raise_error` with the same error class as Spark Classic
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1ea156169ce8 [SPARK-48319][PYTHON][CONNECT][TESTS] Test `assert_true` and `raise_error` with the same error class as Spark Classic 1ea156169ce8 is described below commit 1ea156169ce87dad31c3241bd1ffeb63bc3ac60d Author: Ruifeng Zheng AuthorDate: Fri May 17 17:03:24 2024 +0900 [SPARK-48319][PYTHON][CONNECT][TESTS] Test `assert_true` and `raise_error` with the same error class as Spark Classic ### What changes were proposed in this pull request? Test `assert_true` and `raise_error` with the same error class as Spark Classic ### Why are the changes needed? https://github.com/apache/spark/commit/578931678f5a6d6b33ebdae4bf866871e46fbb83 made `assert_true` and `raise_error` in Spark Connect throw `SparkRuntimeException`, then the error is the same as Spark Classic ### Does this PR introduce _any_ user-facing change? no, test only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #46633 from zhengruifeng/test_assert_raise. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/connect/test_parity_functions.py | 8 1 file changed, 8 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_functions.py b/python/pyspark/sql/tests/connect/test_parity_functions.py index 4fa1cf31b3b6..9dfde11ecbcc 100644 --- a/python/pyspark/sql/tests/connect/test_parity_functions.py +++ b/python/pyspark/sql/tests/connect/test_parity_functions.py @@ -21,14 +21,10 @@ from pyspark.sql.tests.test_functions import FunctionsTestsMixin from pyspark.testing.connectutils import should_test_connect, ReusedConnectTestCase if should_test_connect: -from pyspark.errors.exceptions.connect import SparkConnectException from pyspark.sql.connect.column import Column class FunctionsParityTests(FunctionsTestsMixin, ReusedConnectTestCase): -def test_assert_true(self): -self.check_assert_true(SparkConnectException) - @unittest.skip("Spark Connect does not support Spark Context but the test depends on that.") def test_basic_functions(self): super().test_basic_functions() @@ -41,15 +37,11 @@ class FunctionsParityTests(FunctionsTestsMixin, ReusedConnectTestCase): def test_input_file_name_reset_for_rdd(self): super().test_input_file_name_reset_for_rdd() -def test_raise_error(self): -self.check_raise_error(SparkConnectException) - def test_sorting_functions_with_column(self): self.check_sorting_functions_with_column(Column) if __name__ == "__main__": -import unittest from pyspark.sql.tests.connect.test_parity_functions import * # noqa: F401 try: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48303][CORE] Reorganize `LogKeys`
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5643cfb71d34 [SPARK-48303][CORE] Reorganize `LogKeys` 5643cfb71d34 is described below commit 5643cfb71d343133a185aa257f137074f41abfb3 Author: panbingkun AuthorDate: Thu May 16 23:20:23 2024 -0700 [SPARK-48303][CORE] Reorganize `LogKeys` ### What changes were proposed in this pull request? The pr aims to `reorganize` `LogKeys`, includes: - remove some unused `LogLeys` ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE DEFAULT_COMPACTION_INTERVAL DRIVER_LIBRARY_PATH_KEY EXISTING_JARS EXPECTED_ANSWER FILTERS HAS_R_PACKAGE JAR_ENTRY LOG_KEY_FILE NUM_ADDED_MASTERS NUM_ADDED_WORKERS NUM_PARTITION_VALUES OUTPUT_LINE OUTPUT_LINE_NUMBER PARTITIONS_SIZE RULE_BATCH_NAME SERIALIZE_OUTPUT_LENGTH SHELL_COMMAND STREAM_SOURCE - merge `PARAMETER` into `PARAM` (because some are `full` spelled, and some are `abbreviations`, which are not unified) ESTIMATOR_PARAMETER_MAP -> ESTIMATOR_PARAM_MAP FUNCTION_PARAMETER -> FUNCTION_PARAM METHOD_PARAMETER_TYPES -> METHOD_PARAM_TYPES - merge `NUMBER` into `NUM` (abbreviations) MIN_VERSION_NUMBER -> MIN_VERSION_NUM RULE_NUMBER_OF_RUNS -> NUM_RULE_OF_RUNS VERSION_NUMBER -> VERSION_NUM - merge `TOTAL` into `NUM` TOTAL_RECORDS_READ -> NUM_RECORDS_READ TRAIN_WORD_COUNT -> NUM_TRAIN_WORD - `NUM` as prefix CHECKSUM_FILE_NUM -> NUM_CHECKSUM_FILE DATA_FILE_NUM -> NUM_DATA_FILE INDEX_FILE_NUM -> NUM_INDEX_FILE - COUNR -> NUM EXECUTOR_DESIRED_COUNT -> NUM_EXECUTOR_DESIRED EXECUTOR_LAUNCH_COUNT -> NUM_EXECUTOR_LAUNCH EXECUTOR_TARGET_COUNT -> NUM_EXECUTOR_TARGET KAFKA_PULLS_COUNT -> NUM_KAFKA_PULLS KAFKA_RECORDS_PULLED_COUNT -> NUM_KAFKA_RECORDS_PULLED MIN_FREQUENT_PATTERN_COUNT -> MIN_NUM_FREQUENT_PATTERN POD_COUNT -> NUM_POD POD_SHARED_SLOT_COUNT -> NUM_POD_SHARED_SLOT POD_TARGET_COUNT -> NUM_POD_TARGET RETRY_COUNT -> NUM_RETRY - fix some `typo` MALFORMATTED_STIRNG -> MALFORMATTED_STRING - other MAX_LOG_NUM_POLICY -> MAX_NUM_LOG_POLICY WEIGHTED_NUM -> NUM_WEIGHTED_EXAMPLES Changes in other code are additional changes caused by the above adjustments. ### Why are the changes needed? Let's make `LogKeys` easier to understand and more consistent. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46612 from panbingkun/reorganize_logkey. Authored-by: panbingkun Signed-off-by: Gengliang Wang --- .../network/shuffle/RetryingBlockTransferor.java | 6 +- .../scala/org/apache/spark/internal/LogKey.scala | 68 -- .../sql/connect/client/GrpcRetryHandler.scala | 8 +-- .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 4 +- .../sql/kafka010/KafkaOffsetReaderConsumer.scala | 4 +- .../sql/kafka010/consumer/KafkaDataConsumer.scala | 6 +- .../streaming/kinesis/KinesisBackedBlockRDD.scala | 4 +- .../org/apache/spark/api/r/RBackendHandler.scala | 4 +- .../spark/deploy/history/FsHistoryProvider.scala | 2 +- .../org/apache/spark/deploy/master/Master.scala| 2 +- .../apache/spark/ml/tree/impl/RandomForest.scala | 4 +- .../apache/spark/ml/tuning/CrossValidator.scala| 4 +- .../spark/ml/tuning/TrainValidationSplit.scala | 4 +- .../org/apache/spark/mllib/feature/Word2Vec.scala | 4 +- .../org/apache/spark/mllib/fpm/PrefixSpan.scala| 4 +- .../apache/spark/mllib/linalg/VectorsSuite.scala | 4 +- .../cluster/k8s/ExecutorPodsAllocator.scala| 6 +- ...ernetesLocalDiskShuffleExecutorComponents.scala | 6 +- .../apache/spark/deploy/yarn/YarnAllocator.scala | 6 +- .../catalyst/expressions/V2ExpressionUtils.scala | 4 +- .../spark/sql/catalyst/rules/RuleExecutor.scala| 6 +- .../sql/execution/streaming/state/RocksDB.scala| 18 +++--- .../streaming/state/RocksDBFileManager.scala | 22 +++ .../state/RocksDBStateStoreProvider.scala | 6 +- .../apache/hive/service/server/HiveServer2.java| 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../org/apache/spark/streaming/Checkpoint.scala| 4 +- .../streaming/util/FileBasedWriteAheadLog.scala| 4 +- 28 files changed, 101 insertions(+), 117 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java