(spark) branch branch-3.4 updated: [SPARK-48105][SS][3.5] Fix the race condition between state store unloading and snapshotting

2024-05-17 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 090022d475d6 [SPARK-48105][SS][3.5] Fix the race condition between 
state store unloading and snapshotting
090022d475d6 is described below

commit 090022d475d671ee345f22eb661f644b29ca28c5
Author: Huanli Wang 
AuthorDate: Wed May 15 14:52:04 2024 +0900

[SPARK-48105][SS][3.5] Fix the race condition between state store unloading 
and snapshotting

* When we close the hdfs state store, we should only remove the entry from 
`loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the 
providers.

There are two race conditions between state store snapshotting and state 
store unloading which could result in query failure and potential data 
corruption.

Case 1:
1. the maintenance thread pool encounters some issues and call the 
[stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774)
 this function further calls 
[threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587)
 However, this function doesn't wait for th [...]
2. the provider unload will [close the state 
store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721)
 which [clear the values of 
loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355)
 for HDFS backed state store.
3. if the not-yet-stop maintenance thread is still running and trying to do 
the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been 
removed. if this snapshot process completes successfully, then we will write 
corrupted data and the following batches will consume this corrupted data.

Case 2:

1. In executor_1, the maintenance thread is going to do the snapshot for 
state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
loadedMaps, after this, the maintenance thread [releases the lock of the 
loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751).
2. state_store_1 is loaded in another executor, e.g. executor_2.
3. another state store, state_store_2, is loaded on executor_1 and 
[reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871)
 to driver.
4. executor_1 does the 
[unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713)
 for those no longer active state store which clears the data entries in the 
`HDFSBackedStateStoreMap`
5. the snapshotting thread is terminated and uploads the incomplete 
snapshot to cloud because the [iterator doesn't have next 
element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634)
 after doing the clear.
6. future batches are consuming the corrupted data.

No

```
[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM
```
before this change

```
[info] - state store unload/close happens during the maintenance *** FAILED 
*** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", 
"a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did 
not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 
4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" 
-> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 
15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 

(spark) branch branch-3.5 updated (c1dd4a5df693 -> 1a454287c01e)

2024-05-17 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


from c1dd4a5df693 [SPARK-48297][SQL] Fix a regression TRANSFORM clause with 
char/varchar
 add 1a454287c01e [SPARK-48294][SQL][3.5] Handle lowercase in 
nestedTypeMissingElementTypeError

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/errors/QueryParsingErrors.scala  |  2 +-
 .../spark/sql/errors/QueryParsingErrorsSuite.scala| 19 +++
 2 files changed, 20 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests

2024-05-17 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 51623785c38c [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in 
SparkConnectPlanner to improve performance of Analyze requests
51623785c38c is described below

commit 51623785c38c9b17a6d91cb8e7f686459bd4803e
Author: Xi Lyu 
AuthorDate: Fri May 17 22:06:35 2024 +0200

[SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in 
SparkConnectPlanner to improve performance of Analyze requests

### What changes were proposed in this pull request?

In [this previous PR](https://github.com/apache/spark/pull/46012), we 
introduced two new confs for the introduced plan cache - a static conf 
`spark.connect.session.planCache.maxSize` and a dynamic conf 
`spark.connect.session.planCache.enabled`. The plan cache is enabled by default 
with size 5. In this PR, we are marking them as internal because we don't 
expect users to deal with it.

### Why are the changes needed?

These two confs are not expected to be used under normal circumstances, and 
we don't need to document them on the Spark Configuration reference page.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46638 from xi-db/SPARK-47818-plan-cache-followup2.

Authored-by: Xi Lyu 
Signed-off-by: Herman van Hovell 
---
 .../src/main/scala/org/apache/spark/sql/connect/config/Connect.scala| 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index f9d604712420..91f2e23a8b64 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -279,6 +279,7 @@ object Connect {
   .doc("Sets the maximum number of cached resolved logical plans in Spark 
Connect Session." +
 " If set to a value less or equal than zero will disable the plan 
cache.")
   .version("4.0.0")
+  .internal()
   .intConf
   .createWithDefault(5)
 
@@ -289,6 +290,7 @@ object Connect {
 s" When false, the cache is disabled even if 
'${CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" +
 " greater than zero. The caching is best-effort and not guaranteed.")
   .version("4.0.0")
+  .internal()
   .booleanConf
   .createWithDefault(true)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (15fb4787354a -> 3edd6c7e1d50)

2024-05-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 15fb4787354a [SPARK-48321][CONNECT][TESTS] Avoid using deprecated 
methods in dsl
 add 3edd6c7e1d50 [SPARK-48312][SQL] Improve 
Alias.removeNonInheritableMetadata performance

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/sql/types/Metadata.scala   |  7 +++
 .../spark/sql/catalyst/expressions/namedExpressions.scala  | 14 +++---
 2 files changed, 18 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl

2024-05-17 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 15fb4787354a [SPARK-48321][CONNECT][TESTS] Avoid using deprecated 
methods in dsl
15fb4787354a is described below

commit 15fb4787354a2d5dc97afb31010beb1f3cc3b73d
Author: Ruifeng Zheng 
AuthorDate: Fri May 17 18:06:25 2024 +0800

[SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl

### What changes were proposed in this pull request?
Avoid using deprecated methods in dsl

### Why are the changes needed?
`putAllRenameColumnsMap` was deprecated

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #46635 from zhengruifeng/with_col_rename_dsl.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../src/test/scala/org/apache/spark/sql/connect/dsl/package.scala | 8 +++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala
index da9a0865b8ca..b50c04e7540f 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -1019,7 +1019,13 @@ package object dsl {
 WithColumnsRenamed
   .newBuilder()
   .setInput(logicalPlan)
-  .putAllRenameColumnsMap(renameColumnsMap.asJava))
+  .addAllRenames(renameColumnsMap.toSeq.map { case (k, v) =>
+WithColumnsRenamed.Rename
+  .newBuilder()
+  .setColName(k)
+  .setNewColName(v)
+  .build()
+  }.asJava))
   .build()
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48319][PYTHON][CONNECT][TESTS] Test `assert_true` and `raise_error` with the same error class as Spark Classic

2024-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1ea156169ce8 [SPARK-48319][PYTHON][CONNECT][TESTS] Test `assert_true` 
and `raise_error` with the same error class as Spark Classic
1ea156169ce8 is described below

commit 1ea156169ce87dad31c3241bd1ffeb63bc3ac60d
Author: Ruifeng Zheng 
AuthorDate: Fri May 17 17:03:24 2024 +0900

[SPARK-48319][PYTHON][CONNECT][TESTS] Test `assert_true` and `raise_error` 
with the same error class as Spark Classic

### What changes were proposed in this pull request?
Test `assert_true` and `raise_error` with the same error class as Spark 
Classic

### Why are the changes needed?

https://github.com/apache/spark/commit/578931678f5a6d6b33ebdae4bf866871e46fbb83 
made `assert_true` and `raise_error` in Spark Connect throw 
`SparkRuntimeException`, then the error is the same as Spark Classic

### Does this PR introduce _any_ user-facing change?
no, test only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #46633 from zhengruifeng/test_assert_raise.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/connect/test_parity_functions.py | 8 
 1 file changed, 8 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_functions.py 
b/python/pyspark/sql/tests/connect/test_parity_functions.py
index 4fa1cf31b3b6..9dfde11ecbcc 100644
--- a/python/pyspark/sql/tests/connect/test_parity_functions.py
+++ b/python/pyspark/sql/tests/connect/test_parity_functions.py
@@ -21,14 +21,10 @@ from pyspark.sql.tests.test_functions import 
FunctionsTestsMixin
 from pyspark.testing.connectutils import should_test_connect, 
ReusedConnectTestCase
 
 if should_test_connect:
-from pyspark.errors.exceptions.connect import SparkConnectException
 from pyspark.sql.connect.column import Column
 
 
 class FunctionsParityTests(FunctionsTestsMixin, ReusedConnectTestCase):
-def test_assert_true(self):
-self.check_assert_true(SparkConnectException)
-
 @unittest.skip("Spark Connect does not support Spark Context but the test 
depends on that.")
 def test_basic_functions(self):
 super().test_basic_functions()
@@ -41,15 +37,11 @@ class FunctionsParityTests(FunctionsTestsMixin, 
ReusedConnectTestCase):
 def test_input_file_name_reset_for_rdd(self):
 super().test_input_file_name_reset_for_rdd()
 
-def test_raise_error(self):
-self.check_raise_error(SparkConnectException)
-
 def test_sorting_functions_with_column(self):
 self.check_sorting_functions_with_column(Column)
 
 
 if __name__ == "__main__":
-import unittest
 from pyspark.sql.tests.connect.test_parity_functions import *  # noqa: F401
 
 try:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48303][CORE] Reorganize `LogKeys`

2024-05-17 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5643cfb71d34 [SPARK-48303][CORE] Reorganize `LogKeys`
5643cfb71d34 is described below

commit 5643cfb71d343133a185aa257f137074f41abfb3
Author: panbingkun 
AuthorDate: Thu May 16 23:20:23 2024 -0700

[SPARK-48303][CORE] Reorganize `LogKeys`

### What changes were proposed in this pull request?
The pr aims to `reorganize` `LogKeys`, includes:
- remove some unused `LogLeys`
  ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE
  DEFAULT_COMPACTION_INTERVAL
  DRIVER_LIBRARY_PATH_KEY
  EXISTING_JARS
  EXPECTED_ANSWER
  FILTERS
  HAS_R_PACKAGE
  JAR_ENTRY
  LOG_KEY_FILE
  NUM_ADDED_MASTERS
  NUM_ADDED_WORKERS
  NUM_PARTITION_VALUES
  OUTPUT_LINE
  OUTPUT_LINE_NUMBER
  PARTITIONS_SIZE
  RULE_BATCH_NAME
  SERIALIZE_OUTPUT_LENGTH
  SHELL_COMMAND
  STREAM_SOURCE

- merge `PARAMETER` into `PARAM` (because some are `full` spelled, and some 
are `abbreviations`, which are not unified)
  ESTIMATOR_PARAMETER_MAP -> ESTIMATOR_PARAM_MAP
  FUNCTION_PARAMETER -> FUNCTION_PARAM
  METHOD_PARAMETER_TYPES -> METHOD_PARAM_TYPES

- merge `NUMBER` into `NUM` (abbreviations)
  MIN_VERSION_NUMBER -> MIN_VERSION_NUM
  RULE_NUMBER_OF_RUNS -> NUM_RULE_OF_RUNS
  VERSION_NUMBER -> VERSION_NUM

- merge `TOTAL` into `NUM`
  TOTAL_RECORDS_READ -> NUM_RECORDS_READ
  TRAIN_WORD_COUNT -> NUM_TRAIN_WORD

- `NUM` as prefix
  CHECKSUM_FILE_NUM -> NUM_CHECKSUM_FILE
  DATA_FILE_NUM -> NUM_DATA_FILE
  INDEX_FILE_NUM -> NUM_INDEX_FILE

- COUNR -> NUM
  EXECUTOR_DESIRED_COUNT -> NUM_EXECUTOR_DESIRED
  EXECUTOR_LAUNCH_COUNT -> NUM_EXECUTOR_LAUNCH
  EXECUTOR_TARGET_COUNT -> NUM_EXECUTOR_TARGET
  KAFKA_PULLS_COUNT -> NUM_KAFKA_PULLS
  KAFKA_RECORDS_PULLED_COUNT -> NUM_KAFKA_RECORDS_PULLED
  MIN_FREQUENT_PATTERN_COUNT -> MIN_NUM_FREQUENT_PATTERN
  POD_COUNT -> NUM_POD
  POD_SHARED_SLOT_COUNT -> NUM_POD_SHARED_SLOT
  POD_TARGET_COUNT -> NUM_POD_TARGET
  RETRY_COUNT -> NUM_RETRY

- fix some `typo`
  MALFORMATTED_STIRNG -> MALFORMATTED_STRING

- other
  MAX_LOG_NUM_POLICY -> MAX_NUM_LOG_POLICY
  WEIGHTED_NUM -> NUM_WEIGHTED_EXAMPLES

Changes in other code are additional changes caused by the above 
adjustments.

### Why are the changes needed?
Let's make `LogKeys` easier to understand and more consistent.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46612 from panbingkun/reorganize_logkey.

Authored-by: panbingkun 
Signed-off-by: Gengliang Wang 
---
 .../network/shuffle/RetryingBlockTransferor.java   |  6 +-
 .../scala/org/apache/spark/internal/LogKey.scala   | 68 --
 .../sql/connect/client/GrpcRetryHandler.scala  |  8 +--
 .../sql/kafka010/KafkaOffsetReaderAdmin.scala  |  4 +-
 .../sql/kafka010/KafkaOffsetReaderConsumer.scala   |  4 +-
 .../sql/kafka010/consumer/KafkaDataConsumer.scala  |  6 +-
 .../streaming/kinesis/KinesisBackedBlockRDD.scala  |  4 +-
 .../org/apache/spark/api/r/RBackendHandler.scala   |  4 +-
 .../spark/deploy/history/FsHistoryProvider.scala   |  2 +-
 .../org/apache/spark/deploy/master/Master.scala|  2 +-
 .../apache/spark/ml/tree/impl/RandomForest.scala   |  4 +-
 .../apache/spark/ml/tuning/CrossValidator.scala|  4 +-
 .../spark/ml/tuning/TrainValidationSplit.scala |  4 +-
 .../org/apache/spark/mllib/feature/Word2Vec.scala  |  4 +-
 .../org/apache/spark/mllib/fpm/PrefixSpan.scala|  4 +-
 .../apache/spark/mllib/linalg/VectorsSuite.scala   |  4 +-
 .../cluster/k8s/ExecutorPodsAllocator.scala|  6 +-
 ...ernetesLocalDiskShuffleExecutorComponents.scala |  6 +-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  6 +-
 .../catalyst/expressions/V2ExpressionUtils.scala   |  4 +-
 .../spark/sql/catalyst/rules/RuleExecutor.scala|  6 +-
 .../sql/execution/streaming/state/RocksDB.scala| 18 +++---
 .../streaming/state/RocksDBFileManager.scala   | 22 +++
 .../state/RocksDBStateStoreProvider.scala  |  6 +-
 .../apache/hive/service/server/HiveServer2.java|  2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala |  2 +-
 .../org/apache/spark/streaming/Checkpoint.scala|  4 +-
 .../streaming/util/FileBasedWriteAheadLog.scala|  4 +-
 28 files changed, 101 insertions(+), 117 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java