[spark] branch master updated: [SPARK-36052][K8S][FOLLOWUP] Update config version to 3.2.0
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ea13c5a [SPARK-36052][K8S][FOLLOWUP] Update config version to 3.2.0 ea13c5a is described below commit ea13c5a743feb737d6ff5d28c3f8c968c293 Author: Dongjoon Hyun AuthorDate: Tue Aug 17 10:28:02 2021 +0900 [SPARK-36052][K8S][FOLLOWUP] Update config version to 3.2.0 ### What changes were proposed in this pull request? This PR is a follow-up to update the version of config, `spark.kubernetes.allocation.maxPendingPods`, from 3.3.0 to 3.2.0. ### Why are the changes needed? SPARK-36052 landed at branch-3.2 to fix a bug. ### Does this PR introduce _any_ user-facing change? Yes, but this is a new configuration to fix a bug. ### How was this patch tested? Pass the CIs. Closes #33755 from dongjoon-hyun/SPARK-36052. Authored-by: Dongjoon Hyun Signed-off-by: Hyukjin Kwon --- .../core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2af05f1..2c4adee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -601,7 +601,7 @@ private[spark] object Config extends Logging { "also counted into this limit as they will change into pending PODs by time. " + "This limit is independent from the resource profiles as it limits the sum of all " + "allocation for all the used resource profiles.") - .version("3.3.0") + .version("3.2.0") .intConf .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") .createWithDefault(Int.MaxValue) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36052][K8S] Introducing a limit for pending PODs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new eb09be9 [SPARK-36052][K8S] Introducing a limit for pending PODs eb09be9 is described below commit eb09be9e68737bf2f29ca5391874f4c5fa0de3e2 Author: attilapiros AuthorDate: Tue Aug 10 20:16:21 2021 -0700 [SPARK-36052][K8S] Introducing a limit for pending PODs Introducing a limit for pending PODs (newly created/requested executors included). This limit is global for all the resource profiles. So first we have to count all the newly created and pending PODs (decreased by the ones which requested to be deleted) then we can share the remaining pending POD slots among the resource profiles. Without this PR dynamic allocation could request too many PODs and the K8S scheduler could be overloaded and scheduling of PODs will be affected by the load. No. With new unit tests. Closes #33492 from attilapiros/SPARK-36052. Authored-by: attilapiros Signed-off-by: Dongjoon Hyun (cherry picked from commit 1dced492fb286a7ada73d886fe264f5df0e2b3da) Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/deploy/k8s/Config.scala | 12 +++ .../cluster/k8s/ExecutorPodsAllocator.scala| 85 +++--- .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 77 3 files changed, 148 insertions(+), 26 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 9fceca7..4d352f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -543,6 +543,18 @@ private[spark] object Config extends Logging { .checkValue(delay => delay > 0, "delay must be a positive time value") .createWithDefaultString("30s") + val KUBERNETES_MAX_PENDING_PODS = +ConfigBuilder("spark.kubernetes.allocation.maxPendingPods") + .doc("Maximum number of pending PODs allowed during executor allocation for this " + +"application. Those newly requested executors which are unknown by Kubernetes yet are " + +"also counted into this limit as they will change into pending PODs by time. " + +"This limit is independent from the resource profiles as it limits the sum of all " + +"allocation for all the used resource profiles.") + .version("3.2.0") + .intConf + .checkValue(value => value > 0, "Maximum number of pending pods should be a positive integer") + .createWithDefault(Int.MaxValue) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index d6dc13e..cee5360 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator( private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS) + private val podCreationTimeout = math.max( podAllocationDelay * 5, conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT)) @@ -217,9 +219,15 @@ private[spark] class ExecutorPodsAllocator( } } +// sum of all the pending pods unknown by the scheduler (total for all the resources) var totalPendingCount = 0 -// The order we request executors for each ResourceProfile is not guaranteed. -totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, targetNum) => +// total not running pods (including scheduler known & unknown, pending & newly requested ones) +var totalNotRunningPodCount = 0 +val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId + .asScala + .toSeq + .sortBy(_._1) + .flatMap { case (rpId, targetNum) => val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty) val currentRunningCount = podsForRpId.values.count { @@ -235,7 +243,7 @@ private[spark] class ExecutorPodsAllocator( } // This variab
[spark] branch branch-3.0 updated (763a25a -> ae07b63)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from 763a25a Update Spark key negotiation protocol add ae07b63 [HOTFIX] Add missing deps update for commit protocol change No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 1 + 1 file changed, 1 insertion(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated (dc0f1e5 -> 763a25a)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from dc0f1e5 [SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is interrupted add 763a25a Update Spark key negotiation protocol No new revisions were added by this update. Summary of changes: common/network-common/pom.xml | 4 + .../spark/network/crypto/AuthClientBootstrap.java | 6 +- .../apache/spark/network/crypto/AuthEngine.java| 420 + .../{ServerResponse.java => AuthMessage.java} | 56 ++- .../spark/network/crypto/AuthRpcHandler.java | 6 +- .../spark/network/crypto/ClientChallenge.java | 101 - .../java/org/apache/spark/network/crypto/README.md | 217 --- .../spark/network/crypto/AuthEngineSuite.java | 182 ++--- .../spark/network/crypto/AuthMessagesSuite.java| 46 +-- dev/deps/spark-deps-hadoop-2.7-hive-2.3| 1 + dev/deps/spark-deps-hadoop-3.2-hive-2.3| 1 + pom.xml| 8 +- 12 files changed, 432 insertions(+), 616 deletions(-) rename common/network-common/src/main/java/org/apache/spark/network/crypto/{ServerResponse.java => AuthMessage.java} (53%) delete mode 100644 common/network-common/src/main/java/org/apache/spark/network/crypto/ClientChallenge.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36521][SQL] Disallow comparison between Interval and String
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 41e5144 [SPARK-36521][SQL] Disallow comparison between Interval and String 41e5144 is described below commit 41e5144b53d21d4c67e35250594ee418bdfba136 Author: Gengliang Wang AuthorDate: Mon Aug 16 22:41:14 2021 +0300 [SPARK-36521][SQL] Disallow comparison between Interval and String ### What changes were proposed in this pull request? Disallow comparison between Interval and String in the default type coercion rules. ### Why are the changes needed? If a binary comparison contains interval type and string type, we can't decide which interval type the string should be promoted as. There are many possible interval types, such as year interval, month interval, day interval, hour interval, etc. ### Does this PR introduce _any_ user-facing change? No, the new interval type is not released yet. ### How was this patch tested? Existing UT Closes #33750 from gengliangwang/disallowCom. Authored-by: Gengliang Wang Signed-off-by: Max Gekk (cherry picked from commit 26d6b952dcf7d387930701396de9cef679df7432) Signed-off-by: Max Gekk --- .../spark/sql/catalyst/analysis/TypeCoercion.scala | 16 +++- .../test/resources/sql-tests/inputs/interval.sql | 6 ++ .../sql-tests/results/ansi/interval.sql.out| 56 +- .../resources/sql-tests/results/interval.sql.out | 86 ++ 4 files changed, 148 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 23654af..863bdc0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -862,6 +862,18 @@ object TypeCoercion extends TypeCoercionBase { case _ => None } + // Return whether a string literal can be promoted as the give data type in a binary comparison. + private def canPromoteAsInBinaryComparison(dt: DataType) = dt match { +// If a binary comparison contains interval type and string type, we can't decide which +// interval type the string should be promoted as. There are many possible interval +// types, such as year interval, month interval, day interval, hour interval, etc. +case _: YearMonthIntervalType | _: DayTimeIntervalType => false +// There is no need to add `Cast` for comparison between strings. +case _: StringType => false +case _: AtomicType => true +case _ => false + } + /** * This function determines the target type of a comparison operator when one operand * is a String and the other is not. It also handles when one op is a Date and the @@ -891,8 +903,8 @@ object TypeCoercion extends TypeCoercionBase { case (n: DecimalType, s: StringType) => Some(DoubleType) case (s: StringType, n: DecimalType) => Some(DoubleType) -case (l: StringType, r: AtomicType) if r != StringType => Some(r) -case (l: AtomicType, r: StringType) if l != StringType => Some(l) +case (l: StringType, r: AtomicType) if canPromoteAsInBinaryComparison(r) => Some(r) +case (l: AtomicType, r: StringType) if canPromoteAsInBinaryComparison(l) => Some(l) case (l, r) => None } diff --git a/sql/core/src/test/resources/sql-tests/inputs/interval.sql b/sql/core/src/test/resources/sql-tests/inputs/interval.sql index 618cf16..279c5441 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/interval.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/interval.sql @@ -341,9 +341,15 @@ SELECT INTERVAL 1 MONTH > INTERVAL 20 DAYS; SELECT INTERVAL '1' DAY < '1'; SELECT INTERVAL '1' DAY = '1'; SELECT INTERVAL '1' DAY > '1'; +SELECT '1' < INTERVAL '1' DAY; +SELECT '1' = INTERVAL '1' DAY; +SELECT '1' > INTERVAL '1' DAY; SELECT INTERVAL '1' YEAR < '1'; SELECT INTERVAL '1' YEAR = '1'; SELECT INTERVAL '1' YEAR > '1'; +SELECT '1' < INTERVAL '1' YEAR; +SELECT '1' = INTERVAL '1' YEAR; +SELECT '1' > INTERVAL '1' YEAR; SELECT array(INTERVAL '1' YEAR, INTERVAL '1' MONTH); SELECT array(INTERVAL '1' DAY, INTERVAL '01:01' HOUR TO MINUTE); diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out index e0bf076..1aa0920 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 251 +-- Number of queries: 257 -- !query @@ -2
[spark] branch master updated: [SPARK-36521][SQL] Disallow comparison between Interval and String
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 26d6b95 [SPARK-36521][SQL] Disallow comparison between Interval and String 26d6b95 is described below commit 26d6b952dcf7d387930701396de9cef679df7432 Author: Gengliang Wang AuthorDate: Mon Aug 16 22:41:14 2021 +0300 [SPARK-36521][SQL] Disallow comparison between Interval and String ### What changes were proposed in this pull request? Disallow comparison between Interval and String in the default type coercion rules. ### Why are the changes needed? If a binary comparison contains interval type and string type, we can't decide which interval type the string should be promoted as. There are many possible interval types, such as year interval, month interval, day interval, hour interval, etc. ### Does this PR introduce _any_ user-facing change? No, the new interval type is not released yet. ### How was this patch tested? Existing UT Closes #33750 from gengliangwang/disallowCom. Authored-by: Gengliang Wang Signed-off-by: Max Gekk --- .../spark/sql/catalyst/analysis/TypeCoercion.scala | 16 +++- .../test/resources/sql-tests/inputs/interval.sql | 6 ++ .../sql-tests/results/ansi/interval.sql.out| 56 +- .../resources/sql-tests/results/interval.sql.out | 86 ++ 4 files changed, 148 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 23654af..863bdc0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -862,6 +862,18 @@ object TypeCoercion extends TypeCoercionBase { case _ => None } + // Return whether a string literal can be promoted as the give data type in a binary comparison. + private def canPromoteAsInBinaryComparison(dt: DataType) = dt match { +// If a binary comparison contains interval type and string type, we can't decide which +// interval type the string should be promoted as. There are many possible interval +// types, such as year interval, month interval, day interval, hour interval, etc. +case _: YearMonthIntervalType | _: DayTimeIntervalType => false +// There is no need to add `Cast` for comparison between strings. +case _: StringType => false +case _: AtomicType => true +case _ => false + } + /** * This function determines the target type of a comparison operator when one operand * is a String and the other is not. It also handles when one op is a Date and the @@ -891,8 +903,8 @@ object TypeCoercion extends TypeCoercionBase { case (n: DecimalType, s: StringType) => Some(DoubleType) case (s: StringType, n: DecimalType) => Some(DoubleType) -case (l: StringType, r: AtomicType) if r != StringType => Some(r) -case (l: AtomicType, r: StringType) if l != StringType => Some(l) +case (l: StringType, r: AtomicType) if canPromoteAsInBinaryComparison(r) => Some(r) +case (l: AtomicType, r: StringType) if canPromoteAsInBinaryComparison(l) => Some(l) case (l, r) => None } diff --git a/sql/core/src/test/resources/sql-tests/inputs/interval.sql b/sql/core/src/test/resources/sql-tests/inputs/interval.sql index 618cf16..279c5441 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/interval.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/interval.sql @@ -341,9 +341,15 @@ SELECT INTERVAL 1 MONTH > INTERVAL 20 DAYS; SELECT INTERVAL '1' DAY < '1'; SELECT INTERVAL '1' DAY = '1'; SELECT INTERVAL '1' DAY > '1'; +SELECT '1' < INTERVAL '1' DAY; +SELECT '1' = INTERVAL '1' DAY; +SELECT '1' > INTERVAL '1' DAY; SELECT INTERVAL '1' YEAR < '1'; SELECT INTERVAL '1' YEAR = '1'; SELECT INTERVAL '1' YEAR > '1'; +SELECT '1' < INTERVAL '1' YEAR; +SELECT '1' = INTERVAL '1' YEAR; +SELECT '1' > INTERVAL '1' YEAR; SELECT array(INTERVAL '1' YEAR, INTERVAL '1' MONTH); SELECT array(INTERVAL '1' DAY, INTERVAL '01:01' HOUR TO MINUTE); diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out index e0bf076..1aa0920 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 251 +-- Number of queries: 257 -- !query @@ -2328,6 +2328,33 @@ cannot resolve '(INTERVAL '1' DAY > '1')' due to data type mismatch: differing t -- !q
[spark] branch master updated (05cd5f9 -> 3d57e00)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 05cd5f9 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client add 3d57e00 [SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the programming guide No new revisions were added by this update. Summary of changes: docs/structured-streaming-programming-guide.md | 75 +- 1 file changed, 74 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the programming guide
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 4caa43e [SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the programming guide 4caa43e is described below commit 4caa43e3989998506076c24d3b7020d986249e9f Author: Yuanjian Li AuthorDate: Mon Aug 16 12:32:08 2021 -0700 [SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the programming guide ### What changes were proposed in this pull request? Add the document for the new RocksDBStateStoreProvider. ### Why are the changes needed? User guide for the new feature. ### Does this PR introduce _any_ user-facing change? No, doc only. ### How was this patch tested? Doc only. Closes #33683 from xuanyuanking/SPARK-36041. Authored-by: Yuanjian Li Signed-off-by: Liang-Chi Hsieh (cherry picked from commit 3d57e00a7f8d8f2f7dc0bfbfb0466ef38fb3da08) Signed-off-by: Liang-Chi Hsieh --- docs/structured-streaming-programming-guide.md | 75 +- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index b56d8c8..3f89111 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1870,7 +1870,80 @@ hence the number is not same as the number of original input rows. You'd like to There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. -### State Store and task locality +### State Store + +State store is a versioned key-value store which provides both read and write operations. In +Structured Streaming, we use the state store provider to handle the stateful operations across +batches. There are two built-in state store provider implementations. End users can also implement +their own state store provider by extending StateStoreProvider interface. + + HDFS state store provider + +The HDFS backend state store provider is the default implementation of [[StateStoreProvider]] and +[[StateStore]] in which all the data is stored in memory map in the first stage, and then backed +by files in an HDFS-compatible file system. All updates to the store have to be done in sets +transactionally, and each set of updates increments the store's version. These versions can be +used to re-execute the updates (by retries in RDD operations) on the correct version of the store, +and regenerate the store version. + + RocksDB state store implementation + +As of Spark 3.2, we add a new built-in state store implementation, RocksDB state store provider. + +If you have stateful operations in your streaming query (for example, streaming aggregation, +streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or flatMapGroupsWithState) +and you want to maintain millions of keys in the state, then you may face issues related to large +JVM garbage collection (GC) pauses causing high variations in the micro-batch processing times. +This occurs because, by the implementation of HDFSBackedStateStore, the state data is maintained +in the JVM memory of the executors and large number of state objects puts memory pressure on the +JVM causing high GC pauses. + +In such cases, you can choose to use a more optimized state management solution based on +[RocksDB](https://rocksdb.org/). Rather than keeping the state in the JVM memory, this solution +uses RocksDB to efficiently manage the state in the native memory and the local disk. Furthermore, +any changes to this state are automatically saved by Structured Streaming to the checkpoint +location you have provided, thus providing full fault-tolerance guarantees (the same as default +state management). + +To enable the new build-in state store implementation, set `spark.sql.streaming.stateStore.providerClass` +to `org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider`. + +Here are the configs regarding to RocksDB instance of the state store provider: + + + +Config Name +Description +Default Value + + +spark.sql.streaming.stateStore.rocksdb.compactOnCommit +Whether we perform a range compaction of RocksDB instance for commit operation +False + + +spark.sql.streaming.stateStore.rocksdb.blockSizeKB +Approximate size in KB of user data packed per block for a RocksDB BlockBasedTable, which is a RocksDB's default SST file format. +4 + + +spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB +The size capacity in MB for a cache of blocks. +8 + + +spark.sql.streaming.stateStore.rocksdb.lo
[spark] branch branch-3.2 updated: [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 2fb62e0 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client 2fb62e0 is described below commit 2fb62e0e3d40835a3e61fcf210e0772cd0d21a68 Author: zhuqi-lucas <821684...@qq.com> AuthorDate: Mon Aug 16 13:58:48 2021 -0500 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client ### What changes were proposed in this pull request? Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push. ### Why are the changes needed? When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add the corresponding unit test. Closes #33617 from zhuqi-lucas/master. Authored-by: zhuqi-lucas <821684...@qq.com> Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 05cd5f97c3dea25dacdbdb319243cdab9667c774) Signed-off-by: Mridul Muralidharan --- .../network/server/BlockPushNonFatalFailure.java | 22 +++- .../network/server/TransportRequestHandler.java| 13 -- .../apache/spark/network/shuffle/ErrorHandler.java | 7 ++--- .../network/shuffle/RemoteBlockPushResolver.java | 30 -- .../spark/network/shuffle/ErrorHandlerSuite.java | 4 +++ .../shuffle/RemoteBlockPushResolverSuite.java | 17 +++- .../apache/spark/shuffle/ShuffleBlockPusher.scala | 10 +++- .../spark/shuffle/ShuffleBlockPusherSuite.scala| 6 + 8 files changed, 77 insertions(+), 32 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java index 5906fa2..4bb22b2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java @@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends RuntimeException { " is received after merged shuffle is finalized"; /** + * String constant used for generating exception messages indicating the application attempt is + * not the latest attempt on the server side. When we get a block push failure because of the too + * old attempt, we will not retry pushing the block nor log the exception on the client side. + */ + public static final String TOO_OLD_ATTEMPT_SUFFIX = +" is from an older app attempt"; + + /** * String constant used for generating exception messages indicating a block to be merged * is a stale block push in the case of indeterminate stage retries on the server side. * When we get a block push failure because of the block push being stale, we will not @@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends RuntimeException { * indeterminate stage retries. When the client receives this code, it will not retry * pushing the block. */ -STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX); +STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX), +/** + * Indicate the application attempt is not the latest attempt on the server side. + * When the client gets this code, it will not retry pushing the block. + */ +TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX); private final byte id; // Error message suffix used to generate an error message for a given ReturnCode and @@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends RuntimeException { case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH; case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED; case 3: return ReturnCode.STALE_BLOCK_PUSH; + case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH; default: throw new IllegalArgumentException("Unknown block push return code: " + id); } } + public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) { +return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH || + returnCode == ReturnCode.STALE_BLOCK_PUSH || + returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH; + } + public static String getErrorMsg(String blockId, ReturnCode errorCode) { Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS); return "Block " + blockId +
[spark] branch master updated: [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 05cd5f9 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client 05cd5f9 is described below commit 05cd5f97c3dea25dacdbdb319243cdab9667c774 Author: zhuqi-lucas <821684...@qq.com> AuthorDate: Mon Aug 16 13:58:48 2021 -0500 [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client ### What changes were proposed in this pull request? Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push. ### Why are the changes needed? When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add the corresponding unit test. Closes #33617 from zhuqi-lucas/master. Authored-by: zhuqi-lucas <821684...@qq.com> Signed-off-by: Mridul Muralidharan gmail.com> --- .../network/server/BlockPushNonFatalFailure.java | 22 +++- .../network/server/TransportRequestHandler.java| 13 -- .../apache/spark/network/shuffle/ErrorHandler.java | 7 ++--- .../network/shuffle/RemoteBlockPushResolver.java | 30 -- .../spark/network/shuffle/ErrorHandlerSuite.java | 4 +++ .../shuffle/RemoteBlockPushResolverSuite.java | 17 +++- .../apache/spark/shuffle/ShuffleBlockPusher.scala | 10 +++- .../spark/shuffle/ShuffleBlockPusherSuite.scala| 6 + 8 files changed, 77 insertions(+), 32 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java index 5906fa2..4bb22b2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java @@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends RuntimeException { " is received after merged shuffle is finalized"; /** + * String constant used for generating exception messages indicating the application attempt is + * not the latest attempt on the server side. When we get a block push failure because of the too + * old attempt, we will not retry pushing the block nor log the exception on the client side. + */ + public static final String TOO_OLD_ATTEMPT_SUFFIX = +" is from an older app attempt"; + + /** * String constant used for generating exception messages indicating a block to be merged * is a stale block push in the case of indeterminate stage retries on the server side. * When we get a block push failure because of the block push being stale, we will not @@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends RuntimeException { * indeterminate stage retries. When the client receives this code, it will not retry * pushing the block. */ -STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX); +STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX), +/** + * Indicate the application attempt is not the latest attempt on the server side. + * When the client gets this code, it will not retry pushing the block. + */ +TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX); private final byte id; // Error message suffix used to generate an error message for a given ReturnCode and @@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends RuntimeException { case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH; case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED; case 3: return ReturnCode.STALE_BLOCK_PUSH; + case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH; default: throw new IllegalArgumentException("Unknown block push return code: " + id); } } + public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) { +return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH || + returnCode == ReturnCode.STALE_BLOCK_PUSH || + returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH; + } + public static String getErrorMsg(String blockId, ReturnCode errorCode) { Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS); return "Block " + blockId + errorCode.errorMsgSuffix; diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportReq
[spark] branch branch-3.2 updated: [SPARK-36469][PYTHON] Implement Index.map
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new cb14a32 [SPARK-36469][PYTHON] Implement Index.map cb14a32 is described below commit cb14a3200524ab757f7e68b35e08c43467c29d8b Author: Xinrong Meng AuthorDate: Mon Aug 16 11:06:10 2021 -0700 [SPARK-36469][PYTHON] Implement Index.map ### What changes were proposed in this pull request? Implement `Index.map`. The PR is based on https://github.com/databricks/koalas/pull/2136. Thanks awdavidson for the prototype. `map` of CategoricalIndex and DatetimeIndex will be implemented in separate PRs. ### Why are the changes needed? Mapping values using input correspondence (a dict, Series, or function) is supported in pandas as [Index.map](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Index.map.html). We shall also support hat. ### Does this PR introduce _any_ user-facing change? Yes. `Index.map` is available now. ```py >>> psidx = ps.Index([1, 2, 3]) >>> psidx.map({1: "one", 2: "two", 3: "three"}) Index(['one', 'two', 'three'], dtype='object') >>> psidx.map(lambda id: "{id} + 1".format(id=id)) Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object') >>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3]) >>> psidx.map(pser) Index(['one', 'two', 'three'], dtype='object') ``` ### How was this patch tested? Unit tests. Closes #33694 from xinrong-databricks/index_map. Authored-by: Xinrong Meng Signed-off-by: Takuya UESHIN (cherry picked from commit 4dcd74602571d36a3b9129f0886e1cfc33d7fdc8) Signed-off-by: Takuya UESHIN --- .../source/reference/pyspark.pandas/indexing.rst | 1 + python/pyspark/pandas/indexes/base.py | 46 +- python/pyspark/pandas/indexes/category.py | 7 ++ python/pyspark/pandas/indexes/datetimes.py | 9 ++- python/pyspark/pandas/indexes/multi.py | 7 ++ python/pyspark/pandas/missing/indexes.py | 2 +- python/pyspark/pandas/tests/indexes/test_base.py | 74 ++ 7 files changed, 143 insertions(+), 3 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst b/python/docs/source/reference/pyspark.pandas/indexing.rst index 677d80f..9d53f00 100644 --- a/python/docs/source/reference/pyspark.pandas/indexing.rst +++ b/python/docs/source/reference/pyspark.pandas/indexing.rst @@ -64,6 +64,7 @@ Modifying and computations Index.drop_duplicates Index.min Index.max + Index.map Index.rename Index.repeat Index.take diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index 6c842bc..a43a5d1 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -16,7 +16,7 @@ # from functools import partial -from typing import Any, Iterator, List, Optional, Tuple, Union, cast, no_type_check +from typing import Any, Callable, Iterator, List, Optional, Tuple, Union, cast, no_type_check import warnings import pandas as pd @@ -521,6 +521,50 @@ class Index(IndexOpsMixin): result = result.copy() return result +def map( +self, mapper: Union[dict, Callable[[Any], Any], pd.Series], na_action: Optional[str] = None +) -> "Index": +""" +Map values using input correspondence (a dict, Series, or function). + +Parameters +-- +mapper : function, dict, or pd.Series +Mapping correspondence. +na_action : {None, 'ignore'} +If ‘ignore’, propagate NA values, without passing them to the mapping correspondence. + +Returns +--- +applied : Index, inferred +The output of the mapping function applied to the index. + +Examples + +>>> psidx = ps.Index([1, 2, 3]) + +>>> psidx.map({1: "one", 2: "two", 3: "three"}) +Index(['one', 'two', 'three'], dtype='object') + +>>> psidx.map(lambda id: "{id} + 1".format(id=id)) +Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object') + +>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3]) +>>> psidx.map(pser) +Index(['one', 'two', 'three'], dtype='object') +""" +if isinstance(mapper, dict): +if len(set(type(k) for k in mapper.values())) > 1: +raise TypeError( +"If the mapper is a dictionary, its values must be of the same type" +) + +return Index( +self.to_series().pandas_on_spark.transform_batch( +lambda pser: pser.map(mapper, na_action) +) +).rename(self.name) + @
[spark] branch master updated: [SPARK-36469][PYTHON] Implement Index.map
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4dcd746 [SPARK-36469][PYTHON] Implement Index.map 4dcd746 is described below commit 4dcd74602571d36a3b9129f0886e1cfc33d7fdc8 Author: Xinrong Meng AuthorDate: Mon Aug 16 11:06:10 2021 -0700 [SPARK-36469][PYTHON] Implement Index.map ### What changes were proposed in this pull request? Implement `Index.map`. The PR is based on https://github.com/databricks/koalas/pull/2136. Thanks awdavidson for the prototype. `map` of CategoricalIndex and DatetimeIndex will be implemented in separate PRs. ### Why are the changes needed? Mapping values using input correspondence (a dict, Series, or function) is supported in pandas as [Index.map](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Index.map.html). We shall also support hat. ### Does this PR introduce _any_ user-facing change? Yes. `Index.map` is available now. ```py >>> psidx = ps.Index([1, 2, 3]) >>> psidx.map({1: "one", 2: "two", 3: "three"}) Index(['one', 'two', 'three'], dtype='object') >>> psidx.map(lambda id: "{id} + 1".format(id=id)) Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object') >>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3]) >>> psidx.map(pser) Index(['one', 'two', 'three'], dtype='object') ``` ### How was this patch tested? Unit tests. Closes #33694 from xinrong-databricks/index_map. Authored-by: Xinrong Meng Signed-off-by: Takuya UESHIN --- .../source/reference/pyspark.pandas/indexing.rst | 1 + python/pyspark/pandas/indexes/base.py | 46 +- python/pyspark/pandas/indexes/category.py | 7 ++ python/pyspark/pandas/indexes/datetimes.py | 9 ++- python/pyspark/pandas/indexes/multi.py | 7 ++ python/pyspark/pandas/missing/indexes.py | 2 +- python/pyspark/pandas/tests/indexes/test_base.py | 74 ++ 7 files changed, 143 insertions(+), 3 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst b/python/docs/source/reference/pyspark.pandas/indexing.rst index 677d80f..9d53f00 100644 --- a/python/docs/source/reference/pyspark.pandas/indexing.rst +++ b/python/docs/source/reference/pyspark.pandas/indexing.rst @@ -64,6 +64,7 @@ Modifying and computations Index.drop_duplicates Index.min Index.max + Index.map Index.rename Index.repeat Index.take diff --git a/python/pyspark/pandas/indexes/base.py b/python/pyspark/pandas/indexes/base.py index ccdb54c..9d0d75a 100644 --- a/python/pyspark/pandas/indexes/base.py +++ b/python/pyspark/pandas/indexes/base.py @@ -16,7 +16,7 @@ # from functools import partial -from typing import Any, Iterator, List, Optional, Tuple, Union, cast, no_type_check +from typing import Any, Callable, Iterator, List, Optional, Tuple, Union, cast, no_type_check import warnings import pandas as pd @@ -521,6 +521,50 @@ class Index(IndexOpsMixin): result = result.copy() return result +def map( +self, mapper: Union[dict, Callable[[Any], Any], pd.Series], na_action: Optional[str] = None +) -> "Index": +""" +Map values using input correspondence (a dict, Series, or function). + +Parameters +-- +mapper : function, dict, or pd.Series +Mapping correspondence. +na_action : {None, 'ignore'} +If ‘ignore’, propagate NA values, without passing them to the mapping correspondence. + +Returns +--- +applied : Index, inferred +The output of the mapping function applied to the index. + +Examples + +>>> psidx = ps.Index([1, 2, 3]) + +>>> psidx.map({1: "one", 2: "two", 3: "three"}) +Index(['one', 'two', 'three'], dtype='object') + +>>> psidx.map(lambda id: "{id} + 1".format(id=id)) +Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object') + +>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3]) +>>> psidx.map(pser) +Index(['one', 'two', 'three'], dtype='object') +""" +if isinstance(mapper, dict): +if len(set(type(k) for k in mapper.values())) > 1: +raise TypeError( +"If the mapper is a dictionary, its values must be of the same type" +) + +return Index( +self.to_series().pandas_on_spark.transform_batch( +lambda pser: pser.map(mapper, na_action) +) +).rename(self.name) + @property def values(self) -> np.ndarray: """ diff --git a/python/pyspark/pandas/indexes/category.py
[spark] branch branch-3.2 updated: [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 9149cad [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism 9149cad is described below commit 9149cad57d04f51e246f7a61cd62577cbec73190 Author: Kazuyuki Tanimura AuthorDate: Mon Aug 16 09:11:39 2021 -0700 [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism ### What changes were proposed in this pull request? The current `MapOutputTracker` class may throw `NegativeArraySizeException` with a large number of partitions. Within the serializeOutputStatuses() method, it is trying to compress an array of mapStatuses and outputting the binary data into (Apache)ByteArrayOutputStream . Inside the (Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens because the index is int and overflows (2GB limit) when the output binary size is too large. This PR proposes two high-level ideas: 1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which has a way to output the underlying buffer as `Array[Array[Byte]]`. 2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in order to handle over 2GB compressed data. ### Why are the changes needed? This issue seems to be missed out in the earlier effort of addressing 2GB limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235) Without this fix, `spark.default.parallelism` needs to be kept at the low number. The drawback of setting smaller spark.default.parallelism is that it requires more executor memory (more data per partition). Setting `spark.io.compression.zstd.level` to higher number (default 1) hardly helps. That essentially means we have the data size limit that for shuffling and does not scale. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passed existing tests ``` build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite" ``` Also added a new unit test ``` build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite -- -z SPARK-32210" ``` Ran the benchmark using GitHub Actions and didn't not observe any performance penalties. The results are attached in this PR ``` core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt core/benchmarks/MapStatusesSerDeserBenchmark-results.txt ``` Closes #33721 from kazuyukitanimura/SPARK-32210. Authored-by: Kazuyuki Tanimura Signed-off-by: Dongjoon Hyun (cherry picked from commit 8ee464cd7a09302cacc47a4cbc98fdf307f39dbd) Signed-off-by: Dongjoon Hyun --- .../MapStatusesSerDeserBenchmark-jdk11-results.txt | 54 .../MapStatusesSerDeserBenchmark-results.txt | 54 .../scala/org/apache/spark/MapOutputTracker.scala | 52 +--- .../org/apache/spark/MapOutputTrackerSuite.scala | 72 +- .../spark/MapStatusesSerDeserBenchmark.scala | 4 +- 5 files changed, 156 insertions(+), 80 deletions(-) diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt index 29699a2..0481630 100644 --- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt +++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt @@ -1,64 +1,64 @@ -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz 20 MapOutputs, 10 blocks w/ broadcast: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative - -Serialization179194 9 1.1 897.4 1.0X -Deserialization 254321 74 0.81271.0 0.7X +Serialization148164 8 1.4 739.6 1.0X +Deserialization 202303 72 1.01009.9 0.7X -Compressed Serialized MapStatus sizes: 409 bytes +Compressed Serialized MapStatus sizes: 412 bytes Compressed Serialized Broadcast MapStatus sizes: 2 MB -OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure -Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
[spark] branch master updated (f620996 -> 8ee464c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f620996 [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default pattern add 8ee464c [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism No new revisions were added by this update. Summary of changes: .../MapStatusesSerDeserBenchmark-jdk11-results.txt | 54 .../MapStatusesSerDeserBenchmark-results.txt | 54 .../scala/org/apache/spark/MapOutputTracker.scala | 52 +--- .../org/apache/spark/MapOutputTrackerSuite.scala | 72 +- .../spark/MapStatusesSerDeserBenchmark.scala | 4 +- 5 files changed, 156 insertions(+), 80 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default pattern
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f620996 [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default pattern f620996 is described below commit f620996142ba312f7e52f75476b1b18be667ffdf Author: Max Gekk AuthorDate: Mon Aug 16 23:29:33 2021 +0800 [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default pattern ### What changes were proposed in this pull request? In the PR, I propose to use the `CAST` logic when the pattern is not specified in `DateFormatter` or `TimestampFormatter`. In particular, invoke the `DateTimeUtils.stringToTimestampAnsi()` or `stringToDateAnsi()` in the case. ### Why are the changes needed? 1. This can improve user experience with Spark SQL by making the default date/timestamp parsers more flexible and tolerant to their inputs. 2. We make the default case consistent to the behavior of the `CAST` expression which makes implementation more consistent. ### Does this PR introduce _any_ user-facing change? The changes shouldn't introduce behavior change in regular cases but it can influence on corner cases. New implementation is able to parse more dates/timestamps by default. For instance, old (current) date parses can recognize dates only in the format **-MM-dd** but new one can handle: * `[+-]*` * `[+-]*-[m]m` * `[+-]*-[m]m-[d]d` * `[+-]*-[m]m-[d]d ` * `[+-]*-[m]m-[d]d *` * `[+-]*-[m]m-[d]dT*` Similarly for timestamps. The old (current) timestamp formatter is able to parse timestamps only in the format **-MM-dd HH:mm:ss** by default, but new implementation can handle: * `[+-]*` * `[+-]*-[m]m` * `[+-]*-[m]m-[d]d` * `[+-]*-[m]m-[d]d ` * `[+-]*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` * `[+-]*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]` ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *ImageFileFormatSuite" $ build/sbt "test:testOnly *ParquetV2PartitionDiscoverySuite" ``` Closes #33709 from MaxGekk/datetime-cast-default-pattern. Authored-by: Max Gekk Signed-off-by: Wenchen Fan --- .../ml/source/image/ImageFileFormatSuite.scala | 17 ++- .../spark/sql/catalyst/util/DateFormatter.scala| 33 ++--- .../sql/catalyst/util/TimestampFormatter.scala | 34 +++--- .../sql/catalyst/util/DateFormatterSuite.scala | 22 ++ .../catalyst/util/TimestampFormatterSuite.scala| 25 .../parquet/ParquetPartitionDiscoverySuite.scala | 2 +- 6 files changed, 116 insertions(+), 17 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala index 0ec2747..7dca81e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.ml.source.image import java.net.URI import java.nio.file.Paths +import java.sql.Date import org.apache.spark.SparkFunSuite import org.apache.spark.ml.image.ImageSchema._ @@ -95,14 +96,14 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { .collect() assert(Set(result: _*) === Set( - Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"), - Row("54893.jpg", "kittens", "2018-02"), - Row("DP153539.jpg", "kittens", "2018-02"), - Row("DP802813.jpg", "kittens", "2018-02"), - Row("BGRA.png", "multichannel", "2018-01"), - Row("BGRA_alpha_60.png", "multichannel", "2018-01"), - Row("chr30.4.184.jpg", "multichannel", "2018-02"), - Row("grayscale.jpg", "multichannel", "2018-02") + Row("29.5.a_b_EGDP022204.jpg", "kittens", Date.valueOf("2018-01-01")), + Row("54893.jpg", "kittens", Date.valueOf("2018-02-01")), + Row("DP153539.jpg", "kittens", Date.valueOf("2018-02-01")), + Row("DP802813.jpg", "kittens", Date.valueOf("2018-02-01")), + Row("BGRA.png", "multichannel", Date.valueOf("2018-01-01")), + Row("BGRA_alpha_60.png", "multichannel", Date.valueOf("2018-01-01")), + Row("chr30.4.184.jpg", "multichannel", Date.valueOf("2018-02-01")), + Row("grayscale.jpg", "multichannel", Date.valueOf("2018-02-01")) )) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatt
[spark] branch branch-3.2 updated: [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 233af3d [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation 233af3d is described below commit 233af3d2396391185c45b69c4bd2bf9ca8f1af67 Author: Venkata krishnan Sowrirajan AuthorDate: Mon Aug 16 10:24:40 2021 -0500 [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation ### What changes were proposed in this pull request? Document the push-based shuffle feature with a high level overview of the feature and corresponding configuration options for both shuffle server side as well as client side. This is how the changes to the doc looks on the browser ([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png)) ### Why are the changes needed? Helps users understand the feature ### Does this PR introduce _any_ user-facing change? Docs ### How was this patch tested? N/A Closes #33615 from venkata91/SPARK-36374. Authored-by: Venkata krishnan Sowrirajan Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 2270ecf32f7ae478570145219d2ce71a642076cf) Signed-off-by: Mridul Muralidharan --- .../apache/spark/network/util/TransportConf.java | 28 -- .../shuffle/RemoteBlockPushResolverSuite.java | 2 +- .../org/apache/spark/internal/config/package.scala | 63 ++-- docs/configuration.md | 106 + 4 files changed, 157 insertions(+), 42 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 69b8b25..ed0ca918 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -390,24 +390,32 @@ public class TransportConf { /** * The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during * push-based shuffle. - * A merged shuffle file consists of multiple small shuffle blocks. Fetching the - * complete merged shuffle file in a single response increases the memory requirements for the - * clients. Instead of serving the entire merged file, the shuffle service serves the - * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this - * configuration controls how big a chunk can get. A corresponding index file for each merged - * shuffle file will be generated indicating chunk boundaries. + * A merged shuffle file consists of multiple small shuffle blocks. Fetching the complete + * merged shuffle file in a single disk I/O increases the memory requirements for both the + * clients and the external shuffle service. Instead, the external shuffle service serves + * the merged file in MB-sized chunks. This configuration controls how big a chunk can get. + * A corresponding index file for each merged shuffle file will be generated indicating chunk + * boundaries. + * + * Setting this too high would increase the memory requirements on both the clients and the + * external shuffle service. + * + * Setting this too low would increase the overall number of RPC requests to external shuffle + * service unnecessarily. */ public int minChunkSizeInMergedShuffleFile() { return Ints.checkedCast(JavaUtils.byteStringAsBytes( - conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m"))); + conf.get("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", "2m"))); } /** - * The size of cache in memory which is used in push-based shuffle for storing merged index files. + * The maximum size of cache in memory which is used in push-based shuffle for storing merged + * index files. This cache is in addition to the one configured via + * spark.shuffle.service.index.cache.size. */ public long mergedIndexCacheSize() { return JavaUtils.byteStringAsBytes( - conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m")); + conf.get("spark.shuffle.push.server.mergedIndexCacheSize", "100m")); } /** @@ -417,7 +425,7 @@ public class TransportConf { * blocks for this shuffle partition. */ public int ioExceptionsThresholdDuringMerge() { -return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4); +return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); } /** diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushRe
[spark] branch master updated: [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2270ecf [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation 2270ecf is described below commit 2270ecf32f7ae478570145219d2ce71a642076cf Author: Venkata krishnan Sowrirajan AuthorDate: Mon Aug 16 10:24:40 2021 -0500 [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation ### What changes were proposed in this pull request? Document the push-based shuffle feature with a high level overview of the feature and corresponding configuration options for both shuffle server side as well as client side. This is how the changes to the doc looks on the browser ([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png)) ### Why are the changes needed? Helps users understand the feature ### Does this PR introduce _any_ user-facing change? Docs ### How was this patch tested? N/A Closes #33615 from venkata91/SPARK-36374. Authored-by: Venkata krishnan Sowrirajan Signed-off-by: Mridul Muralidharan gmail.com> --- .../apache/spark/network/util/TransportConf.java | 28 -- .../shuffle/RemoteBlockPushResolverSuite.java | 2 +- .../org/apache/spark/internal/config/package.scala | 63 ++-- docs/configuration.md | 106 + 4 files changed, 157 insertions(+), 42 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 69b8b25..ed0ca918 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -390,24 +390,32 @@ public class TransportConf { /** * The minimum size of a chunk when dividing a merged shuffle file into multiple chunks during * push-based shuffle. - * A merged shuffle file consists of multiple small shuffle blocks. Fetching the - * complete merged shuffle file in a single response increases the memory requirements for the - * clients. Instead of serving the entire merged file, the shuffle service serves the - * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in entirety and this - * configuration controls how big a chunk can get. A corresponding index file for each merged - * shuffle file will be generated indicating chunk boundaries. + * A merged shuffle file consists of multiple small shuffle blocks. Fetching the complete + * merged shuffle file in a single disk I/O increases the memory requirements for both the + * clients and the external shuffle service. Instead, the external shuffle service serves + * the merged file in MB-sized chunks. This configuration controls how big a chunk can get. + * A corresponding index file for each merged shuffle file will be generated indicating chunk + * boundaries. + * + * Setting this too high would increase the memory requirements on both the clients and the + * external shuffle service. + * + * Setting this too low would increase the overall number of RPC requests to external shuffle + * service unnecessarily. */ public int minChunkSizeInMergedShuffleFile() { return Ints.checkedCast(JavaUtils.byteStringAsBytes( - conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m"))); + conf.get("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", "2m"))); } /** - * The size of cache in memory which is used in push-based shuffle for storing merged index files. + * The maximum size of cache in memory which is used in push-based shuffle for storing merged + * index files. This cache is in addition to the one configured via + * spark.shuffle.service.index.cache.size. */ public long mergedIndexCacheSize() { return JavaUtils.byteStringAsBytes( - conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m")); + conf.get("spark.shuffle.push.server.mergedIndexCacheSize", "100m")); } /** @@ -417,7 +425,7 @@ public class TransportConf { * blocks for this shuffle partition. */ public int ioExceptionsThresholdDuringMerge() { -return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4); +return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); } /** diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.j
[spark] branch branch-3.1 updated: Update Spark key negotiation protocol
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 1cb0f7d Update Spark key negotiation protocol 1cb0f7d is described below commit 1cb0f7db88a4aea20ab95cb07825d13f8e0f25aa Author: Sean Owen AuthorDate: Wed Aug 11 18:04:55 2021 -0500 Update Spark key negotiation protocol --- common/network-common/pom.xml | 4 + .../spark/network/crypto/AuthClientBootstrap.java | 6 +- .../apache/spark/network/crypto/AuthEngine.java| 420 + .../{ServerResponse.java => AuthMessage.java} | 56 ++- .../spark/network/crypto/AuthRpcHandler.java | 6 +- .../spark/network/crypto/ClientChallenge.java | 101 - .../java/org/apache/spark/network/crypto/README.md | 217 --- .../spark/network/crypto/AuthEngineSuite.java | 182 ++--- .../spark/network/crypto/AuthMessagesSuite.java| 46 +-- dev/deps/spark-deps-hadoop-2.7-hive-2.3| 1 + dev/deps/spark-deps-hadoop-3.2-hive-2.3| 1 + pom.xml| 6 + 12 files changed, 431 insertions(+), 615 deletions(-) diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 0dd6619..2233100 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -92,6 +92,10 @@ commons-crypto + com.google.crypto.tink + tink + + org.roaringbitmap RoaringBitmap diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java index 4428f0f..b555410 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java @@ -105,15 +105,15 @@ public class AuthClientBootstrap implements TransportClientBootstrap { String secretKey = secretKeyHolder.getSecretKey(appId); try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) { - ClientChallenge challenge = engine.challenge(); + AuthMessage challenge = engine.challenge(); ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength()); challenge.encode(challengeData); ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs()); - ServerResponse response = ServerResponse.decodeMessage(responseData); + AuthMessage response = AuthMessage.decodeMessage(responseData); - engine.validate(response); + engine.deriveSessionCipher(challenge, response); engine.sessionCipher().addToChannel(channel); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java index c2b2edc..aadf2b5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java @@ -17,134 +17,216 @@ package org.apache.spark.network.crypto; +import javax.crypto.spec.SecretKeySpec; import java.io.Closeable; -import java.io.IOException; -import java.math.BigInteger; import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.Properties; -import javax.crypto.Cipher; -import javax.crypto.SecretKey; -import javax.crypto.SecretKeyFactory; -import javax.crypto.ShortBufferException; -import javax.crypto.spec.IvParameterSpec; -import javax.crypto.spec.PBEKeySpec; -import javax.crypto.spec.SecretKeySpec; -import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Bytes; -import org.apache.commons.crypto.cipher.CryptoCipher; -import org.apache.commons.crypto.cipher.CryptoCipherFactory; -import org.apache.commons.crypto.random.CryptoRandom; -import org.apache.commons.crypto.random.CryptoRandomFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.google.crypto.tink.subtle.AesGcmJce; +import com.google.crypto.tink.subtle.Hkdf; +import com.google.crypto.tink.subtle.Random; +import com.google.crypto.tink.subtle.X25519; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import static java.nio.charset.StandardCharsets.UTF_8; import org.apache.spark.network.util.TransportConf; /** - * A helper class for abstracting authentication and key negotiation details. This is used by - * both client and server sides, since the operations are basically the same. + * A helper class for abstracting au
[spark] branch master updated (9b9db5a -> f4b31c6)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9b9db5a [SPARK-36491][SQL] Make from_json/to_json to handle timestamp_ntz type properly add f4b31c6 [SPARK-36498][SQL] Reorder inner fields of the input query in byName V2 write No new revisions were added by this update. Summary of changes: .../catalyst/analysis/TableOutputResolver.scala| 170 +++-- ...lysisSuite.scala => V2WriteAnalysisSuite.scala} | 96 2 files changed, 222 insertions(+), 44 deletions(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/{DataSourceV2AnalysisSuite.scala => V2WriteAnalysisSuite.scala} (88%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8b8d91c -> 9b9db5a)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 8b8d91c [SPARK-36465][SS] Dynamic gap duration in session window add 9b9db5a [SPARK-36491][SQL] Make from_json/to_json to handle timestamp_ntz type properly No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/json/JacksonGenerator.scala | 12 .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 12 .../sql-tests/results/timestampNTZ/timestamp-ansi.sql.out | 5 ++--- .../sql-tests/results/timestampNTZ/timestamp.sql.out | 5 ++--- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala| 14 +- 5 files changed, 41 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org