[spark] branch master updated: [SPARK-36052][K8S][FOLLOWUP] Update config version to 3.2.0

2021-08-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ea13c5a  [SPARK-36052][K8S][FOLLOWUP] Update config version to 3.2.0
ea13c5a is described below

commit ea13c5a743feb737d6ff5d28c3f8c968c293
Author: Dongjoon Hyun 
AuthorDate: Tue Aug 17 10:28:02 2021 +0900

[SPARK-36052][K8S][FOLLOWUP] Update config version to 3.2.0

### What changes were proposed in this pull request?

This PR is a follow-up to update the version of config, 
`spark.kubernetes.allocation.maxPendingPods`, from 3.3.0 to 3.2.0.

### Why are the changes needed?

SPARK-36052 landed at branch-3.2 to fix a bug.

### Does this PR introduce _any_ user-facing change?

Yes, but this is a new configuration to fix a bug.

### How was this patch tested?

Pass the CIs.

Closes #33755 from dongjoon-hyun/SPARK-36052.

Authored-by: Dongjoon Hyun 
Signed-off-by: Hyukjin Kwon 
---
 .../core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 2af05f1..2c4adee 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -601,7 +601,7 @@ private[spark] object Config extends Logging {
 "also counted into this limit as they will change into pending PODs by 
time. " +
 "This limit is independent from the resource profiles as it limits the 
sum of all " +
 "allocation for all the used resource profiles.")
-  .version("3.3.0")
+  .version("3.2.0")
   .intConf
   .checkValue(value => value > 0, "Maximum number of pending pods should 
be a positive integer")
   .createWithDefault(Int.MaxValue)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36052][K8S] Introducing a limit for pending PODs

2021-08-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new eb09be9  [SPARK-36052][K8S] Introducing a limit for pending PODs
eb09be9 is described below

commit eb09be9e68737bf2f29ca5391874f4c5fa0de3e2
Author: attilapiros 
AuthorDate: Tue Aug 10 20:16:21 2021 -0700

[SPARK-36052][K8S] Introducing a limit for pending PODs

Introducing a limit for pending PODs (newly created/requested executors 
included).
This limit is global for all the resource profiles. So first we have to 
count all the newly created and pending PODs (decreased by the ones which 
requested to be deleted) then we can share the remaining pending POD slots 
among the resource profiles.

Without this PR dynamic allocation could request too many PODs and the K8S 
scheduler could be overloaded and scheduling of PODs will be affected by the 
load.

No.

With new unit tests.

Closes #33492 from attilapiros/SPARK-36052.

Authored-by: attilapiros 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 1dced492fb286a7ada73d886fe264f5df0e2b3da)
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 12 +++
 .../cluster/k8s/ExecutorPodsAllocator.scala| 85 +++---
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 77 
 3 files changed, 148 insertions(+), 26 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 9fceca7..4d352f7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -543,6 +543,18 @@ private[spark] object Config extends Logging {
   .checkValue(delay => delay > 0, "delay must be a positive time value")
   .createWithDefaultString("30s")
 
+  val KUBERNETES_MAX_PENDING_PODS =
+ConfigBuilder("spark.kubernetes.allocation.maxPendingPods")
+  .doc("Maximum number of pending PODs allowed during executor allocation 
for this " +
+"application. Those newly requested executors which are unknown by 
Kubernetes yet are " +
+"also counted into this limit as they will change into pending PODs by 
time. " +
+"This limit is independent from the resource profiles as it limits the 
sum of all " +
+"allocation for all the used resource profiles.")
+  .version("3.2.0")
+  .intConf
+  .checkValue(value => value > 0, "Maximum number of pending pods should 
be a positive integer")
+  .createWithDefault(Int.MaxValue)
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.service.annotation."
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index d6dc13e..cee5360 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator(
 
   private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
 
+  private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
+
   private val podCreationTimeout = math.max(
 podAllocationDelay * 5,
 conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
@@ -217,9 +219,15 @@ private[spark] class ExecutorPodsAllocator(
   }
 }
 
+// sum of all the pending pods unknown by the scheduler (total for all the 
resources)
 var totalPendingCount = 0
-// The order we request executors for each ResourceProfile is not 
guaranteed.
-totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, 
targetNum) =>
+// total not running pods (including scheduler known & unknown, pending & 
newly requested ones)
+var totalNotRunningPodCount = 0
+val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId
+  .asScala
+  .toSeq
+  .sortBy(_._1)
+  .flatMap { case (rpId, targetNum) =>
   val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, 
mutable.HashMap.empty)
 
   val currentRunningCount = podsForRpId.values.count {
@@ -235,7 +243,7 @@ private[spark] class ExecutorPodsAllocator(
   }
   // This variab

[spark] branch branch-3.0 updated (763a25a -> ae07b63)

2021-08-16 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 763a25a  Update Spark key negotiation protocol
 add ae07b63  [HOTFIX] Add missing deps update for commit protocol change

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 1 +
 1 file changed, 1 insertion(+)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated (dc0f1e5 -> 763a25a)

2021-08-16 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git.


from dc0f1e5  [SPARK-36500][CORE] Fix temp_shuffle file leaking when a task 
is interrupted
 add 763a25a  Update Spark key negotiation protocol

No new revisions were added by this update.

Summary of changes:
 common/network-common/pom.xml  |   4 +
 .../spark/network/crypto/AuthClientBootstrap.java  |   6 +-
 .../apache/spark/network/crypto/AuthEngine.java| 420 +
 .../{ServerResponse.java => AuthMessage.java}  |  56 ++-
 .../spark/network/crypto/AuthRpcHandler.java   |   6 +-
 .../spark/network/crypto/ClientChallenge.java  | 101 -
 .../java/org/apache/spark/network/crypto/README.md | 217 ---
 .../spark/network/crypto/AuthEngineSuite.java  | 182 ++---
 .../spark/network/crypto/AuthMessagesSuite.java|  46 +--
 dev/deps/spark-deps-hadoop-2.7-hive-2.3|   1 +
 dev/deps/spark-deps-hadoop-3.2-hive-2.3|   1 +
 pom.xml|   8 +-
 12 files changed, 432 insertions(+), 616 deletions(-)
 rename 
common/network-common/src/main/java/org/apache/spark/network/crypto/{ServerResponse.java
 => AuthMessage.java} (53%)
 delete mode 100644 
common/network-common/src/main/java/org/apache/spark/network/crypto/ClientChallenge.java

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36521][SQL] Disallow comparison between Interval and String

2021-08-16 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 41e5144  [SPARK-36521][SQL] Disallow comparison between Interval and 
String
41e5144 is described below

commit 41e5144b53d21d4c67e35250594ee418bdfba136
Author: Gengliang Wang 
AuthorDate: Mon Aug 16 22:41:14 2021 +0300

[SPARK-36521][SQL] Disallow comparison between Interval and String

### What changes were proposed in this pull request?

Disallow comparison between Interval and String in the default type 
coercion rules.

### Why are the changes needed?

If a binary comparison contains interval type and string type, we can't 
decide which
interval type the string should be promoted as. There are many possible 
interval
types, such as year interval, month interval, day interval, hour interval, 
etc.

### Does this PR introduce _any_ user-facing change?

No, the new interval type is not released yet.

### How was this patch tested?

Existing UT

Closes #33750 from gengliangwang/disallowCom.

Authored-by: Gengliang Wang 
Signed-off-by: Max Gekk 
(cherry picked from commit 26d6b952dcf7d387930701396de9cef679df7432)
Signed-off-by: Max Gekk 
---
 .../spark/sql/catalyst/analysis/TypeCoercion.scala | 16 +++-
 .../test/resources/sql-tests/inputs/interval.sql   |  6 ++
 .../sql-tests/results/ansi/interval.sql.out| 56 +-
 .../resources/sql-tests/results/interval.sql.out   | 86 ++
 4 files changed, 148 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 23654af..863bdc0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -862,6 +862,18 @@ object TypeCoercion extends TypeCoercionBase {
 case _ => None
   }
 
+  // Return whether a string literal can be promoted as the give data type in 
a binary comparison.
+  private def canPromoteAsInBinaryComparison(dt: DataType) = dt match {
+// If a binary comparison contains interval type and string type, we can't 
decide which
+// interval type the string should be promoted as. There are many possible 
interval
+// types, such as year interval, month interval, day interval, hour 
interval, etc.
+case _: YearMonthIntervalType | _: DayTimeIntervalType => false
+// There is no need to add `Cast` for comparison between strings.
+case _: StringType => false
+case _: AtomicType => true
+case _ => false
+  }
+
   /**
* This function determines the target type of a comparison operator when 
one operand
* is a String and the other is not. It also handles when one op is a Date 
and the
@@ -891,8 +903,8 @@ object TypeCoercion extends TypeCoercionBase {
 case (n: DecimalType, s: StringType) => Some(DoubleType)
 case (s: StringType, n: DecimalType) => Some(DoubleType)
 
-case (l: StringType, r: AtomicType) if r != StringType => Some(r)
-case (l: AtomicType, r: StringType) if l != StringType => Some(l)
+case (l: StringType, r: AtomicType) if canPromoteAsInBinaryComparison(r) 
=> Some(r)
+case (l: AtomicType, r: StringType) if canPromoteAsInBinaryComparison(l) 
=> Some(l)
 case (l, r) => None
   }
 
diff --git a/sql/core/src/test/resources/sql-tests/inputs/interval.sql 
b/sql/core/src/test/resources/sql-tests/inputs/interval.sql
index 618cf16..279c5441 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/interval.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/interval.sql
@@ -341,9 +341,15 @@ SELECT INTERVAL 1 MONTH > INTERVAL 20 DAYS;
 SELECT INTERVAL '1' DAY < '1';
 SELECT INTERVAL '1' DAY = '1';
 SELECT INTERVAL '1' DAY > '1';
+SELECT '1' < INTERVAL '1' DAY;
+SELECT '1' = INTERVAL '1' DAY;
+SELECT '1' > INTERVAL '1' DAY;
 SELECT INTERVAL '1' YEAR < '1';
 SELECT INTERVAL '1' YEAR = '1';
 SELECT INTERVAL '1' YEAR > '1';
+SELECT '1' < INTERVAL '1' YEAR;
+SELECT '1' = INTERVAL '1' YEAR;
+SELECT '1' > INTERVAL '1' YEAR;
 
 SELECT array(INTERVAL '1' YEAR, INTERVAL '1' MONTH);
 SELECT array(INTERVAL '1' DAY, INTERVAL '01:01' HOUR TO MINUTE);
diff --git 
a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
index e0bf076..1aa0920 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 251
+-- Number of queries: 257
 
 
 -- !query
@@ -2

[spark] branch master updated: [SPARK-36521][SQL] Disallow comparison between Interval and String

2021-08-16 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 26d6b95  [SPARK-36521][SQL] Disallow comparison between Interval and 
String
26d6b95 is described below

commit 26d6b952dcf7d387930701396de9cef679df7432
Author: Gengliang Wang 
AuthorDate: Mon Aug 16 22:41:14 2021 +0300

[SPARK-36521][SQL] Disallow comparison between Interval and String

### What changes were proposed in this pull request?

Disallow comparison between Interval and String in the default type 
coercion rules.

### Why are the changes needed?

If a binary comparison contains interval type and string type, we can't 
decide which
interval type the string should be promoted as. There are many possible 
interval
types, such as year interval, month interval, day interval, hour interval, 
etc.

### Does this PR introduce _any_ user-facing change?

No, the new interval type is not released yet.

### How was this patch tested?

Existing UT

Closes #33750 from gengliangwang/disallowCom.

Authored-by: Gengliang Wang 
Signed-off-by: Max Gekk 
---
 .../spark/sql/catalyst/analysis/TypeCoercion.scala | 16 +++-
 .../test/resources/sql-tests/inputs/interval.sql   |  6 ++
 .../sql-tests/results/ansi/interval.sql.out| 56 +-
 .../resources/sql-tests/results/interval.sql.out   | 86 ++
 4 files changed, 148 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 23654af..863bdc0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -862,6 +862,18 @@ object TypeCoercion extends TypeCoercionBase {
 case _ => None
   }
 
+  // Return whether a string literal can be promoted as the give data type in 
a binary comparison.
+  private def canPromoteAsInBinaryComparison(dt: DataType) = dt match {
+// If a binary comparison contains interval type and string type, we can't 
decide which
+// interval type the string should be promoted as. There are many possible 
interval
+// types, such as year interval, month interval, day interval, hour 
interval, etc.
+case _: YearMonthIntervalType | _: DayTimeIntervalType => false
+// There is no need to add `Cast` for comparison between strings.
+case _: StringType => false
+case _: AtomicType => true
+case _ => false
+  }
+
   /**
* This function determines the target type of a comparison operator when 
one operand
* is a String and the other is not. It also handles when one op is a Date 
and the
@@ -891,8 +903,8 @@ object TypeCoercion extends TypeCoercionBase {
 case (n: DecimalType, s: StringType) => Some(DoubleType)
 case (s: StringType, n: DecimalType) => Some(DoubleType)
 
-case (l: StringType, r: AtomicType) if r != StringType => Some(r)
-case (l: AtomicType, r: StringType) if l != StringType => Some(l)
+case (l: StringType, r: AtomicType) if canPromoteAsInBinaryComparison(r) 
=> Some(r)
+case (l: AtomicType, r: StringType) if canPromoteAsInBinaryComparison(l) 
=> Some(l)
 case (l, r) => None
   }
 
diff --git a/sql/core/src/test/resources/sql-tests/inputs/interval.sql 
b/sql/core/src/test/resources/sql-tests/inputs/interval.sql
index 618cf16..279c5441 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/interval.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/interval.sql
@@ -341,9 +341,15 @@ SELECT INTERVAL 1 MONTH > INTERVAL 20 DAYS;
 SELECT INTERVAL '1' DAY < '1';
 SELECT INTERVAL '1' DAY = '1';
 SELECT INTERVAL '1' DAY > '1';
+SELECT '1' < INTERVAL '1' DAY;
+SELECT '1' = INTERVAL '1' DAY;
+SELECT '1' > INTERVAL '1' DAY;
 SELECT INTERVAL '1' YEAR < '1';
 SELECT INTERVAL '1' YEAR = '1';
 SELECT INTERVAL '1' YEAR > '1';
+SELECT '1' < INTERVAL '1' YEAR;
+SELECT '1' = INTERVAL '1' YEAR;
+SELECT '1' > INTERVAL '1' YEAR;
 
 SELECT array(INTERVAL '1' YEAR, INTERVAL '1' MONTH);
 SELECT array(INTERVAL '1' DAY, INTERVAL '01:01' HOUR TO MINUTE);
diff --git 
a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
index e0bf076..1aa0920 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 251
+-- Number of queries: 257
 
 
 -- !query
@@ -2328,6 +2328,33 @@ cannot resolve '(INTERVAL '1' DAY > '1')' due to data 
type mismatch: differing t
 
 
 -- !q

[spark] branch master updated (05cd5f9 -> 3d57e00)

2021-08-16 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 05cd5f9  [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started 
error message in BlockPushErrorHandler in client
 add 3d57e00  [SPARK-36041][SS][DOCS] Introduce the 
RocksDBStateStoreProvider in the programming guide

No new revisions were added by this update.

Summary of changes:
 docs/structured-streaming-programming-guide.md | 75 +-
 1 file changed, 74 insertions(+), 1 deletion(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the programming guide

2021-08-16 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 4caa43e  [SPARK-36041][SS][DOCS] Introduce the 
RocksDBStateStoreProvider in the programming guide
4caa43e is described below

commit 4caa43e3989998506076c24d3b7020d986249e9f
Author: Yuanjian Li 
AuthorDate: Mon Aug 16 12:32:08 2021 -0700

[SPARK-36041][SS][DOCS] Introduce the RocksDBStateStoreProvider in the 
programming guide

### What changes were proposed in this pull request?
Add the document for the new RocksDBStateStoreProvider.

### Why are the changes needed?
User guide for the new feature.

### Does this PR introduce _any_ user-facing change?
No, doc only.

### How was this patch tested?
Doc only.

Closes #33683 from xuanyuanking/SPARK-36041.

Authored-by: Yuanjian Li 
Signed-off-by: Liang-Chi Hsieh 
(cherry picked from commit 3d57e00a7f8d8f2f7dc0bfbfb0466ef38fb3da08)
Signed-off-by: Liang-Chi Hsieh 
---
 docs/structured-streaming-programming-guide.md | 75 +-
 1 file changed, 74 insertions(+), 1 deletion(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index b56d8c8..3f89111 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1870,7 +1870,80 @@ hence the number is not same as the number of original 
input rows. You'd like to
 There's a known workaround: split your streaming query into multiple queries 
per stateful operator, and ensure
 end-to-end exactly once per query. Ensuring end-to-end exactly once for the 
last query is optional.
 
-### State Store and task locality
+### State Store
+
+State store is a versioned key-value store which provides both read and write 
operations. In
+Structured Streaming, we use the state store provider to handle the stateful 
operations across
+batches. There are two built-in state store provider implementations. End 
users can also implement
+their own state store provider by extending StateStoreProvider interface.
+
+ HDFS state store provider
+
+The HDFS backend state store provider is the default implementation of 
[[StateStoreProvider]] and
+[[StateStore]] in which all the data is stored in memory map in the first 
stage, and then backed
+by files in an HDFS-compatible file system. All updates to the store have to 
be done in sets
+transactionally, and each set of updates increments the store's version. These 
versions can be
+used to re-execute the updates (by retries in RDD operations) on the correct 
version of the store,
+and regenerate the store version.
+
+ RocksDB state store implementation
+
+As of Spark 3.2, we add a new built-in state store implementation, RocksDB 
state store provider.
+
+If you have stateful operations in your streaming query (for example, 
streaming aggregation,
+streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or 
flatMapGroupsWithState)
+and you want to maintain millions of keys in the state, then you may face 
issues related to large
+JVM garbage collection (GC) pauses causing high variations in the micro-batch 
processing times.
+This occurs because, by the implementation of HDFSBackedStateStore, the state 
data is maintained
+in the JVM memory of the executors and large number of state objects puts 
memory pressure on the
+JVM causing high GC pauses.
+
+In such cases, you can choose to use a more optimized state management 
solution based on
+[RocksDB](https://rocksdb.org/). Rather than keeping the state in the JVM 
memory, this solution
+uses RocksDB to efficiently manage the state in the native memory and the 
local disk. Furthermore,
+any changes to this state are automatically saved by Structured Streaming to 
the checkpoint
+location you have provided, thus providing full fault-tolerance guarantees 
(the same as default
+state management).
+
+To enable the new build-in state store implementation, set 
`spark.sql.streaming.stateStore.providerClass`
+to `org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider`.
+
+Here are the configs regarding to RocksDB instance of the state store provider:
+
+
+  
+Config Name
+Description
+Default Value
+  
+  
+spark.sql.streaming.stateStore.rocksdb.compactOnCommit
+Whether we perform a range compaction of RocksDB instance for commit 
operation
+False
+  
+  
+spark.sql.streaming.stateStore.rocksdb.blockSizeKB
+Approximate size in KB of user data packed per block for a RocksDB 
BlockBasedTable, which is a RocksDB's default SST file format.
+4
+  
+  
+spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB
+The size capacity in MB for a cache of blocks.
+8
+  
+  
+spark.sql.streaming.stateStore.rocksdb.lo

[spark] branch branch-3.2 updated: [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client

2021-08-16 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 2fb62e0  [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started 
error message in BlockPushErrorHandler in client
2fb62e0 is described below

commit 2fb62e0e3d40835a3e61fcf210e0772cd0d21a68
Author: zhuqi-lucas <821684...@qq.com>
AuthorDate: Mon Aug 16 13:58:48 2021 -0500

[SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message 
in BlockPushErrorHandler in client

### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates 
the PushblockStream message is received after a new application attempt has 
started. This error message should be correctly handled in client without 
retrying the block push.

### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will 
not retry pushing the block nor log the exception on the client side.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add the corresponding unit test.

Closes #33617 from zhuqi-lucas/master.

Authored-by: zhuqi-lucas <821684...@qq.com>
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 05cd5f97c3dea25dacdbdb319243cdab9667c774)
Signed-off-by: Mridul Muralidharan 
---
 .../network/server/BlockPushNonFatalFailure.java   | 22 +++-
 .../network/server/TransportRequestHandler.java| 13 --
 .../apache/spark/network/shuffle/ErrorHandler.java |  7 ++---
 .../network/shuffle/RemoteBlockPushResolver.java   | 30 --
 .../spark/network/shuffle/ErrorHandlerSuite.java   |  4 +++
 .../shuffle/RemoteBlockPushResolverSuite.java  | 17 +++-
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  | 10 +++-
 .../spark/shuffle/ShuffleBlockPusherSuite.scala|  6 +
 8 files changed, 77 insertions(+), 32 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
index 5906fa2..4bb22b2 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
 " is received after merged shuffle is finalized";
 
   /**
+   * String constant used for generating exception messages indicating the 
application attempt is
+   * not the latest attempt on the server side. When we get a block push 
failure because of the too
+   * old attempt, we will not retry pushing the block nor log the exception on 
the client side.
+   */
+  public static final String TOO_OLD_ATTEMPT_SUFFIX =
+" is from an older app attempt";
+
+  /**
* String constant used for generating exception messages indicating a block 
to be merged
* is a stale block push in the case of indeterminate stage retries on the 
server side.
* When we get a block push failure because of the block push being stale, 
we will not
@@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
  * indeterminate stage retries. When the client receives this code, it 
will not retry
  * pushing the block.
  */
-STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX),
+/**
+ * Indicate the application attempt is not the latest attempt on the 
server side.
+ * When the client gets this code, it will not retry pushing the block.
+ */
+TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX);
 
 private final byte id;
 // Error message suffix used to generate an error message for a given 
ReturnCode and
@@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
   case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
   case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
   case 3: return ReturnCode.STALE_BLOCK_PUSH;
+  case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH;
   default: throw new IllegalArgumentException("Unknown block push return 
code: " + id);
 }
   }
 
+  public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) {
+return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+  returnCode == ReturnCode.STALE_BLOCK_PUSH ||
+  returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH;
+  }
+
   public static String getErrorMsg(String blockId, ReturnCode errorCode) {
 Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
 return "Block " + blockId +

[spark] branch master updated: [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message in BlockPushErrorHandler in client

2021-08-16 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 05cd5f9  [SPARK-35548][CORE][SHUFFLE] Handling new attempt has started 
error message in BlockPushErrorHandler in client
05cd5f9 is described below

commit 05cd5f97c3dea25dacdbdb319243cdab9667c774
Author: zhuqi-lucas <821684...@qq.com>
AuthorDate: Mon Aug 16 13:58:48 2021 -0500

[SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error message 
in BlockPushErrorHandler in client

### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates 
the PushblockStream message is received after a new application attempt has 
started. This error message should be correctly handled in client without 
retrying the block push.

### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will 
not retry pushing the block nor log the exception on the client side.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add the corresponding unit test.

Closes #33617 from zhuqi-lucas/master.

Authored-by: zhuqi-lucas <821684...@qq.com>
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../network/server/BlockPushNonFatalFailure.java   | 22 +++-
 .../network/server/TransportRequestHandler.java| 13 --
 .../apache/spark/network/shuffle/ErrorHandler.java |  7 ++---
 .../network/shuffle/RemoteBlockPushResolver.java   | 30 --
 .../spark/network/shuffle/ErrorHandlerSuite.java   |  4 +++
 .../shuffle/RemoteBlockPushResolverSuite.java  | 17 +++-
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  | 10 +++-
 .../spark/shuffle/ShuffleBlockPusherSuite.scala|  6 +
 8 files changed, 77 insertions(+), 32 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
index 5906fa2..4bb22b2 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -40,6 +40,14 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
 " is received after merged shuffle is finalized";
 
   /**
+   * String constant used for generating exception messages indicating the 
application attempt is
+   * not the latest attempt on the server side. When we get a block push 
failure because of the too
+   * old attempt, we will not retry pushing the block nor log the exception on 
the client side.
+   */
+  public static final String TOO_OLD_ATTEMPT_SUFFIX =
+" is from an older app attempt";
+
+  /**
* String constant used for generating exception messages indicating a block 
to be merged
* is a stale block push in the case of indeterminate stage retries on the 
server side.
* When we get a block push failure because of the block push being stale, 
we will not
@@ -124,7 +132,12 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
  * indeterminate stage retries. When the client receives this code, it 
will not retry
  * pushing the block.
  */
-STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX),
+/**
+ * Indicate the application attempt is not the latest attempt on the 
server side.
+ * When the client gets this code, it will not retry pushing the block.
+ */
+TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX);
 
 private final byte id;
 // Error message suffix used to generate an error message for a given 
ReturnCode and
@@ -146,10 +159,17 @@ public class BlockPushNonFatalFailure extends 
RuntimeException {
   case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
   case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
   case 3: return ReturnCode.STALE_BLOCK_PUSH;
+  case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH;
   default: throw new IllegalArgumentException("Unknown block push return 
code: " + id);
 }
   }
 
+  public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) {
+return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+  returnCode == ReturnCode.STALE_BLOCK_PUSH ||
+  returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH;
+  }
+
   public static String getErrorMsg(String blockId, ReturnCode errorCode) {
 Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
 return "Block " + blockId + errorCode.errorMsgSuffix;
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportReq

[spark] branch branch-3.2 updated: [SPARK-36469][PYTHON] Implement Index.map

2021-08-16 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new cb14a32  [SPARK-36469][PYTHON] Implement Index.map
cb14a32 is described below

commit cb14a3200524ab757f7e68b35e08c43467c29d8b
Author: Xinrong Meng 
AuthorDate: Mon Aug 16 11:06:10 2021 -0700

[SPARK-36469][PYTHON] Implement Index.map

### What changes were proposed in this pull request?
Implement `Index.map`.

The PR is based on https://github.com/databricks/koalas/pull/2136. Thanks 
awdavidson for the prototype.

`map` of CategoricalIndex and DatetimeIndex will be implemented in separate 
PRs.

### Why are the changes needed?
Mapping values using input correspondence (a dict, Series, or function) is 
supported in pandas as 
[Index.map](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Index.map.html).
We shall also support hat.

### Does this PR introduce _any_ user-facing change?
Yes. `Index.map` is available now.

```py
>>> psidx = ps.Index([1, 2, 3])

>>> psidx.map({1: "one", 2: "two", 3: "three"})
Index(['one', 'two', 'three'], dtype='object')

>>> psidx.map(lambda id: "{id} + 1".format(id=id))
Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object')

>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
>>> psidx.map(pser)
Index(['one', 'two', 'three'], dtype='object')
```

### How was this patch tested?
Unit tests.

Closes #33694 from xinrong-databricks/index_map.

Authored-by: Xinrong Meng 
Signed-off-by: Takuya UESHIN 
(cherry picked from commit 4dcd74602571d36a3b9129f0886e1cfc33d7fdc8)
Signed-off-by: Takuya UESHIN 
---
 .../source/reference/pyspark.pandas/indexing.rst   |  1 +
 python/pyspark/pandas/indexes/base.py  | 46 +-
 python/pyspark/pandas/indexes/category.py  |  7 ++
 python/pyspark/pandas/indexes/datetimes.py |  9 ++-
 python/pyspark/pandas/indexes/multi.py |  7 ++
 python/pyspark/pandas/missing/indexes.py   |  2 +-
 python/pyspark/pandas/tests/indexes/test_base.py   | 74 ++
 7 files changed, 143 insertions(+), 3 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst 
b/python/docs/source/reference/pyspark.pandas/indexing.rst
index 677d80f..9d53f00 100644
--- a/python/docs/source/reference/pyspark.pandas/indexing.rst
+++ b/python/docs/source/reference/pyspark.pandas/indexing.rst
@@ -64,6 +64,7 @@ Modifying and computations
Index.drop_duplicates
Index.min
Index.max
+   Index.map
Index.rename
Index.repeat
Index.take
diff --git a/python/pyspark/pandas/indexes/base.py 
b/python/pyspark/pandas/indexes/base.py
index 6c842bc..a43a5d1 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -16,7 +16,7 @@
 #
 
 from functools import partial
-from typing import Any, Iterator, List, Optional, Tuple, Union, cast, 
no_type_check
+from typing import Any, Callable, Iterator, List, Optional, Tuple, Union, 
cast, no_type_check
 import warnings
 
 import pandas as pd
@@ -521,6 +521,50 @@ class Index(IndexOpsMixin):
 result = result.copy()
 return result
 
+def map(
+self, mapper: Union[dict, Callable[[Any], Any], pd.Series], na_action: 
Optional[str] = None
+) -> "Index":
+"""
+Map values using input correspondence (a dict, Series, or function).
+
+Parameters
+--
+mapper : function, dict, or pd.Series
+Mapping correspondence.
+na_action : {None, 'ignore'}
+If ‘ignore’, propagate NA values, without passing them to the 
mapping correspondence.
+
+Returns
+---
+applied : Index, inferred
+The output of the mapping function applied to the index.
+
+Examples
+
+>>> psidx = ps.Index([1, 2, 3])
+
+>>> psidx.map({1: "one", 2: "two", 3: "three"})
+Index(['one', 'two', 'three'], dtype='object')
+
+>>> psidx.map(lambda id: "{id} + 1".format(id=id))
+Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object')
+
+>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
+>>> psidx.map(pser)
+Index(['one', 'two', 'three'], dtype='object')
+"""
+if isinstance(mapper, dict):
+if len(set(type(k) for k in mapper.values())) > 1:
+raise TypeError(
+"If the mapper is a dictionary, its values must be of the 
same type"
+)
+
+return Index(
+self.to_series().pandas_on_spark.transform_batch(
+lambda pser: pser.map(mapper, na_action)
+)
+).rename(self.name)
+
 @

[spark] branch master updated: [SPARK-36469][PYTHON] Implement Index.map

2021-08-16 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4dcd746  [SPARK-36469][PYTHON] Implement Index.map
4dcd746 is described below

commit 4dcd74602571d36a3b9129f0886e1cfc33d7fdc8
Author: Xinrong Meng 
AuthorDate: Mon Aug 16 11:06:10 2021 -0700

[SPARK-36469][PYTHON] Implement Index.map

### What changes were proposed in this pull request?
Implement `Index.map`.

The PR is based on https://github.com/databricks/koalas/pull/2136. Thanks 
awdavidson for the prototype.

`map` of CategoricalIndex and DatetimeIndex will be implemented in separate 
PRs.

### Why are the changes needed?
Mapping values using input correspondence (a dict, Series, or function) is 
supported in pandas as 
[Index.map](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Index.map.html).
We shall also support hat.

### Does this PR introduce _any_ user-facing change?
Yes. `Index.map` is available now.

```py
>>> psidx = ps.Index([1, 2, 3])

>>> psidx.map({1: "one", 2: "two", 3: "three"})
Index(['one', 'two', 'three'], dtype='object')

>>> psidx.map(lambda id: "{id} + 1".format(id=id))
Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object')

>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
>>> psidx.map(pser)
Index(['one', 'two', 'three'], dtype='object')
```

### How was this patch tested?
Unit tests.

Closes #33694 from xinrong-databricks/index_map.

Authored-by: Xinrong Meng 
Signed-off-by: Takuya UESHIN 
---
 .../source/reference/pyspark.pandas/indexing.rst   |  1 +
 python/pyspark/pandas/indexes/base.py  | 46 +-
 python/pyspark/pandas/indexes/category.py  |  7 ++
 python/pyspark/pandas/indexes/datetimes.py |  9 ++-
 python/pyspark/pandas/indexes/multi.py |  7 ++
 python/pyspark/pandas/missing/indexes.py   |  2 +-
 python/pyspark/pandas/tests/indexes/test_base.py   | 74 ++
 7 files changed, 143 insertions(+), 3 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst 
b/python/docs/source/reference/pyspark.pandas/indexing.rst
index 677d80f..9d53f00 100644
--- a/python/docs/source/reference/pyspark.pandas/indexing.rst
+++ b/python/docs/source/reference/pyspark.pandas/indexing.rst
@@ -64,6 +64,7 @@ Modifying and computations
Index.drop_duplicates
Index.min
Index.max
+   Index.map
Index.rename
Index.repeat
Index.take
diff --git a/python/pyspark/pandas/indexes/base.py 
b/python/pyspark/pandas/indexes/base.py
index ccdb54c..9d0d75a 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -16,7 +16,7 @@
 #
 
 from functools import partial
-from typing import Any, Iterator, List, Optional, Tuple, Union, cast, 
no_type_check
+from typing import Any, Callable, Iterator, List, Optional, Tuple, Union, 
cast, no_type_check
 import warnings
 
 import pandas as pd
@@ -521,6 +521,50 @@ class Index(IndexOpsMixin):
 result = result.copy()
 return result
 
+def map(
+self, mapper: Union[dict, Callable[[Any], Any], pd.Series], na_action: 
Optional[str] = None
+) -> "Index":
+"""
+Map values using input correspondence (a dict, Series, or function).
+
+Parameters
+--
+mapper : function, dict, or pd.Series
+Mapping correspondence.
+na_action : {None, 'ignore'}
+If ‘ignore’, propagate NA values, without passing them to the 
mapping correspondence.
+
+Returns
+---
+applied : Index, inferred
+The output of the mapping function applied to the index.
+
+Examples
+
+>>> psidx = ps.Index([1, 2, 3])
+
+>>> psidx.map({1: "one", 2: "two", 3: "three"})
+Index(['one', 'two', 'three'], dtype='object')
+
+>>> psidx.map(lambda id: "{id} + 1".format(id=id))
+Index(['1 + 1', '2 + 1', '3 + 1'], dtype='object')
+
+>>> pser = pd.Series(["one", "two", "three"], index=[1, 2, 3])
+>>> psidx.map(pser)
+Index(['one', 'two', 'three'], dtype='object')
+"""
+if isinstance(mapper, dict):
+if len(set(type(k) for k in mapper.values())) > 1:
+raise TypeError(
+"If the mapper is a dictionary, its values must be of the 
same type"
+)
+
+return Index(
+self.to_series().pandas_on_spark.transform_batch(
+lambda pser: pser.map(mapper, na_action)
+)
+).rename(self.name)
+
 @property
 def values(self) -> np.ndarray:
 """
diff --git a/python/pyspark/pandas/indexes/category.py 

[spark] branch branch-3.2 updated: [SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with large spark.default.parallelism

2021-08-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 9149cad  [SPARK-32210][CORE] Fix NegativeArraySizeException in 
MapOutputTracker with large spark.default.parallelism
9149cad is described below

commit 9149cad57d04f51e246f7a61cd62577cbec73190
Author: Kazuyuki Tanimura 
AuthorDate: Mon Aug 16 09:11:39 2021 -0700

[SPARK-32210][CORE] Fix NegativeArraySizeException in MapOutputTracker with 
large spark.default.parallelism

### What changes were proposed in this pull request?
The current `MapOutputTracker` class may throw `NegativeArraySizeException` 
with a large number of partitions. Within the serializeOutputStatuses() method, 
it is trying to compress an array of mapStatuses and outputting the binary data 
into (Apache)ByteArrayOutputStream . Inside the 
(Apache)ByteArrayOutputStream.toByteArray(), negative index exception happens 
because the index is int and overflows (2GB limit) when the output binary size 
is too large.

This PR proposes two high-level ideas:
  1. Use `org.apache.spark.util.io.ChunkedByteBufferOutputStream`, which 
has a way to output the underlying buffer as `Array[Array[Byte]]`.
  2. Change the signatures from `Array[Byte]` to `Array[Array[Byte]]` in 
order to handle over 2GB compressed data.

### Why are the changes needed?
This issue seems to be missed out in the earlier effort of addressing 2GB 
limitations [SPARK-6235](https://issues.apache.org/jira/browse/SPARK-6235)

Without this fix, `spark.default.parallelism` needs to be kept at the low 
number. The drawback of setting smaller spark.default.parallelism is that it 
requires more executor memory (more data per partition). Setting 
`spark.io.compression.zstd.level` to higher number (default 1) hardly helps.

That essentially means we have the data size limit that for shuffling and 
does not scale.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passed existing tests
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite"
```
Also added a new unit test
```
build/sbt "core/testOnly org.apache.spark.MapOutputTrackerSuite  -- -z 
SPARK-32210"
```
Ran the benchmark using GitHub Actions and didn't not observe any 
performance penalties. The results are attached in this PR
```
core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
core/benchmarks/MapStatusesSerDeserBenchmark-results.txt
```

Closes #33721 from kazuyukitanimura/SPARK-32210.

Authored-by: Kazuyuki Tanimura 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 8ee464cd7a09302cacc47a4cbc98fdf307f39dbd)
Signed-off-by: Dongjoon Hyun 
---
 .../MapStatusesSerDeserBenchmark-jdk11-results.txt | 54 
 .../MapStatusesSerDeserBenchmark-results.txt   | 54 
 .../scala/org/apache/spark/MapOutputTracker.scala  | 52 +---
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 72 +-
 .../spark/MapStatusesSerDeserBenchmark.scala   |  4 +-
 5 files changed, 156 insertions(+), 80 deletions(-)

diff --git a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt 
b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
index 29699a2..0481630 100644
--- a/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
+++ b/core/benchmarks/MapStatusesSerDeserBenchmark-jdk11-results.txt
@@ -1,64 +1,64 @@
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 20 MapOutputs, 10 blocks w/ broadcast:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
-
-Serialization179194
   9  1.1 897.4   1.0X
-Deserialization  254321
  74  0.81271.0   0.7X
+Serialization148164
   8  1.4 739.6   1.0X
+Deserialization  202303
  72  1.01009.9   0.7X
 
-Compressed Serialized MapStatus sizes: 409 bytes
+Compressed Serialized MapStatus sizes: 412 bytes
 Compressed Serialized Broadcast MapStatus sizes: 2 MB
 
 
-OpenJDK 64-Bit Server VM 11.0.10+9-LTS on Linux 5.4.0-1043-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 5.8.0-1039-azure

[spark] branch master updated (f620996 -> 8ee464c)

2021-08-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from f620996  [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps 
with default pattern
 add 8ee464c  [SPARK-32210][CORE] Fix NegativeArraySizeException in 
MapOutputTracker with large spark.default.parallelism

No new revisions were added by this update.

Summary of changes:
 .../MapStatusesSerDeserBenchmark-jdk11-results.txt | 54 
 .../MapStatusesSerDeserBenchmark-results.txt   | 54 
 .../scala/org/apache/spark/MapOutputTracker.scala  | 52 +---
 .../org/apache/spark/MapOutputTrackerSuite.scala   | 72 +-
 .../spark/MapStatusesSerDeserBenchmark.scala   |  4 +-
 5 files changed, 156 insertions(+), 80 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default pattern

2021-08-16 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f620996  [SPARK-36418][SQL] Use CAST in parsing of dates/timestamps 
with default pattern
f620996 is described below

commit f620996142ba312f7e52f75476b1b18be667ffdf
Author: Max Gekk 
AuthorDate: Mon Aug 16 23:29:33 2021 +0800

[SPARK-36418][SQL] Use CAST in parsing of dates/timestamps with default 
pattern

### What changes were proposed in this pull request?
In the PR, I propose to use the `CAST` logic when the pattern is not 
specified in `DateFormatter` or `TimestampFormatter`. In particular, invoke the 
`DateTimeUtils.stringToTimestampAnsi()` or `stringToDateAnsi()` in the case.

### Why are the changes needed?
1. This can improve user experience with Spark SQL by making the default 
date/timestamp parsers more flexible and tolerant to their inputs.
2. We make the default case consistent to the behavior of the `CAST` 
expression which makes implementation more consistent.

### Does this PR introduce _any_ user-facing change?
The changes shouldn't introduce behavior change in regular cases but it can 
influence on corner cases. New implementation is able to parse more 
dates/timestamps by default. For instance, old (current) date parses can 
recognize dates only in the format **-MM-dd** but new one can handle:
   * `[+-]*`
   * `[+-]*-[m]m`
   * `[+-]*-[m]m-[d]d`
   * `[+-]*-[m]m-[d]d `
   * `[+-]*-[m]m-[d]d *`
   * `[+-]*-[m]m-[d]dT*`

Similarly for timestamps. The old (current) timestamp formatter is able to 
parse timestamps only in the format **-MM-dd HH:mm:ss** by default, but new 
implementation can handle:
   * `[+-]*`
   * `[+-]*-[m]m`
   * `[+-]*-[m]m-[d]d`
   * `[+-]*-[m]m-[d]d `
   * `[+-]*-[m]m-[d]d [h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `[+-]*-[m]m-[d]dT[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`
   * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us][zone_id]`

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *ImageFileFormatSuite"
$ build/sbt "test:testOnly *ParquetV2PartitionDiscoverySuite"
```

Closes #33709 from MaxGekk/datetime-cast-default-pattern.

Authored-by: Max Gekk 
Signed-off-by: Wenchen Fan 
---
 .../ml/source/image/ImageFileFormatSuite.scala | 17 ++-
 .../spark/sql/catalyst/util/DateFormatter.scala| 33 ++---
 .../sql/catalyst/util/TimestampFormatter.scala | 34 +++---
 .../sql/catalyst/util/DateFormatterSuite.scala | 22 ++
 .../catalyst/util/TimestampFormatterSuite.scala| 25 
 .../parquet/ParquetPartitionDiscoverySuite.scala   |  2 +-
 6 files changed, 116 insertions(+), 17 deletions(-)

diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
index 0ec2747..7dca81e 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ml.source.image
 
 import java.net.URI
 import java.nio.file.Paths
+import java.sql.Date
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.image.ImageSchema._
@@ -95,14 +96,14 @@ class ImageFileFormatSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   .collect()
 
 assert(Set(result: _*) === Set(
-  Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"),
-  Row("54893.jpg", "kittens", "2018-02"),
-  Row("DP153539.jpg", "kittens", "2018-02"),
-  Row("DP802813.jpg", "kittens", "2018-02"),
-  Row("BGRA.png", "multichannel", "2018-01"),
-  Row("BGRA_alpha_60.png", "multichannel", "2018-01"),
-  Row("chr30.4.184.jpg", "multichannel", "2018-02"),
-  Row("grayscale.jpg", "multichannel", "2018-02")
+  Row("29.5.a_b_EGDP022204.jpg", "kittens", Date.valueOf("2018-01-01")),
+  Row("54893.jpg", "kittens", Date.valueOf("2018-02-01")),
+  Row("DP153539.jpg", "kittens", Date.valueOf("2018-02-01")),
+  Row("DP802813.jpg", "kittens", Date.valueOf("2018-02-01")),
+  Row("BGRA.png", "multichannel", Date.valueOf("2018-01-01")),
+  Row("BGRA_alpha_60.png", "multichannel", Date.valueOf("2018-01-01")),
+  Row("chr30.4.184.jpg", "multichannel", Date.valueOf("2018-02-01")),
+  Row("grayscale.jpg", "multichannel", Date.valueOf("2018-02-01"))
 ))
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatt

[spark] branch branch-3.2 updated: [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation

2021-08-16 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 233af3d  [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level 
user documentation
233af3d is described below

commit 233af3d2396391185c45b69c4bd2bf9ca8f1af67
Author: Venkata krishnan Sowrirajan 
AuthorDate: Mon Aug 16 10:24:40 2021 -0500

[SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation

### What changes were proposed in this pull request?

Document the push-based shuffle feature with a high level overview of the 
feature and corresponding configuration options for both shuffle server side as 
well as client side. This is how the changes to the doc looks on the browser 
([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))

### Why are the changes needed?

Helps users understand the feature

### Does this PR introduce _any_ user-facing change?

Docs

### How was this patch tested?

N/A

Closes #33615 from venkata91/SPARK-36374.

Authored-by: Venkata krishnan Sowrirajan 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 2270ecf32f7ae478570145219d2ce71a642076cf)
Signed-off-by: Mridul Muralidharan 
---
 .../apache/spark/network/util/TransportConf.java   |  28 --
 .../shuffle/RemoteBlockPushResolverSuite.java  |   2 +-
 .../org/apache/spark/internal/config/package.scala |  63 ++--
 docs/configuration.md  | 106 +
 4 files changed, 157 insertions(+), 42 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 69b8b25..ed0ca918 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -390,24 +390,32 @@ public class TransportConf {
   /**
* The minimum size of a chunk when dividing a merged shuffle file into 
multiple chunks during
* push-based shuffle.
-   * A merged shuffle file consists of multiple small shuffle blocks. Fetching 
the
-   * complete merged shuffle file in a single response increases the memory 
requirements for the
-   * clients. Instead of serving the entire merged file, the shuffle service 
serves the
-   * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in 
entirety and this
-   * configuration controls how big a chunk can get. A corresponding index 
file for each merged
-   * shuffle file will be generated indicating chunk boundaries.
+   * A merged shuffle file consists of multiple small shuffle blocks. Fetching 
the complete
+   * merged shuffle file in a single disk I/O increases the memory 
requirements for both the
+   * clients and the external shuffle service. Instead, the external shuffle 
service serves
+   * the merged file in MB-sized chunks. This configuration controls how big a 
chunk can get.
+   * A corresponding index file for each merged shuffle file will be generated 
indicating chunk
+   * boundaries.
+   *
+   * Setting this too high would increase the memory requirements on both the 
clients and the
+   * external shuffle service.
+   *
+   * Setting this too low would increase the overall number of RPC requests to 
external shuffle
+   * service unnecessarily.
*/
   public int minChunkSizeInMergedShuffleFile() {
 return Ints.checkedCast(JavaUtils.byteStringAsBytes(
-  conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
+  conf.get("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", 
"2m")));
   }
 
   /**
-   * The size of cache in memory which is used in push-based shuffle for 
storing merged index files.
+   * The maximum size of cache in memory which is used in push-based shuffle 
for storing merged
+   * index files. This cache is in addition to the one configured via
+   * spark.shuffle.service.index.cache.size.
*/
   public long mergedIndexCacheSize() {
 return JavaUtils.byteStringAsBytes(
-  conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
+  conf.get("spark.shuffle.push.server.mergedIndexCacheSize", "100m"));
   }
 
   /**
@@ -417,7 +425,7 @@ public class TransportConf {
* blocks for this shuffle partition.
*/
   public int ioExceptionsThresholdDuringMerge() {
-return 
conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
+return 
conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
   }
 
   /**
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushRe

[spark] branch master updated: [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation

2021-08-16 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2270ecf  [SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level 
user documentation
2270ecf is described below

commit 2270ecf32f7ae478570145219d2ce71a642076cf
Author: Venkata krishnan Sowrirajan 
AuthorDate: Mon Aug 16 10:24:40 2021 -0500

[SPARK-36374][SHUFFLE][DOC] Push-based shuffle high level user documentation

### What changes were proposed in this pull request?

Document the push-based shuffle feature with a high level overview of the 
feature and corresponding configuration options for both shuffle server side as 
well as client side. This is how the changes to the doc looks on the browser 
([img](https://user-images.githubusercontent.com/8871522/129231582-ad86ee2f-246f-4b42-9528-4ccd693e86d2.png))

### Why are the changes needed?

Helps users understand the feature

### Does this PR introduce _any_ user-facing change?

Docs

### How was this patch tested?

N/A

Closes #33615 from venkata91/SPARK-36374.

Authored-by: Venkata krishnan Sowrirajan 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../apache/spark/network/util/TransportConf.java   |  28 --
 .../shuffle/RemoteBlockPushResolverSuite.java  |   2 +-
 .../org/apache/spark/internal/config/package.scala |  63 ++--
 docs/configuration.md  | 106 +
 4 files changed, 157 insertions(+), 42 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 69b8b25..ed0ca918 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -390,24 +390,32 @@ public class TransportConf {
   /**
* The minimum size of a chunk when dividing a merged shuffle file into 
multiple chunks during
* push-based shuffle.
-   * A merged shuffle file consists of multiple small shuffle blocks. Fetching 
the
-   * complete merged shuffle file in a single response increases the memory 
requirements for the
-   * clients. Instead of serving the entire merged file, the shuffle service 
serves the
-   * merged file in `chunks`. A `chunk` constitutes few shuffle blocks in 
entirety and this
-   * configuration controls how big a chunk can get. A corresponding index 
file for each merged
-   * shuffle file will be generated indicating chunk boundaries.
+   * A merged shuffle file consists of multiple small shuffle blocks. Fetching 
the complete
+   * merged shuffle file in a single disk I/O increases the memory 
requirements for both the
+   * clients and the external shuffle service. Instead, the external shuffle 
service serves
+   * the merged file in MB-sized chunks. This configuration controls how big a 
chunk can get.
+   * A corresponding index file for each merged shuffle file will be generated 
indicating chunk
+   * boundaries.
+   *
+   * Setting this too high would increase the memory requirements on both the 
clients and the
+   * external shuffle service.
+   *
+   * Setting this too low would increase the overall number of RPC requests to 
external shuffle
+   * service unnecessarily.
*/
   public int minChunkSizeInMergedShuffleFile() {
 return Ints.checkedCast(JavaUtils.byteStringAsBytes(
-  conf.get("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "2m")));
+  conf.get("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", 
"2m")));
   }
 
   /**
-   * The size of cache in memory which is used in push-based shuffle for 
storing merged index files.
+   * The maximum size of cache in memory which is used in push-based shuffle 
for storing merged
+   * index files. This cache is in addition to the one configured via
+   * spark.shuffle.service.index.cache.size.
*/
   public long mergedIndexCacheSize() {
 return JavaUtils.byteStringAsBytes(
-  conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
+  conf.get("spark.shuffle.push.server.mergedIndexCacheSize", "100m"));
   }
 
   /**
@@ -417,7 +425,7 @@ public class TransportConf {
* blocks for this shuffle partition.
*/
   public int ioExceptionsThresholdDuringMerge() {
-return 
conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
+return 
conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
   }
 
   /**
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.j

[spark] branch branch-3.1 updated: Update Spark key negotiation protocol

2021-08-16 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 1cb0f7d  Update Spark key negotiation protocol
1cb0f7d is described below

commit 1cb0f7db88a4aea20ab95cb07825d13f8e0f25aa
Author: Sean Owen 
AuthorDate: Wed Aug 11 18:04:55 2021 -0500

Update Spark key negotiation protocol
---
 common/network-common/pom.xml  |   4 +
 .../spark/network/crypto/AuthClientBootstrap.java  |   6 +-
 .../apache/spark/network/crypto/AuthEngine.java| 420 +
 .../{ServerResponse.java => AuthMessage.java}  |  56 ++-
 .../spark/network/crypto/AuthRpcHandler.java   |   6 +-
 .../spark/network/crypto/ClientChallenge.java  | 101 -
 .../java/org/apache/spark/network/crypto/README.md | 217 ---
 .../spark/network/crypto/AuthEngineSuite.java  | 182 ++---
 .../spark/network/crypto/AuthMessagesSuite.java|  46 +--
 dev/deps/spark-deps-hadoop-2.7-hive-2.3|   1 +
 dev/deps/spark-deps-hadoop-3.2-hive-2.3|   1 +
 pom.xml|   6 +
 12 files changed, 431 insertions(+), 615 deletions(-)

diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 0dd6619..2233100 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -92,6 +92,10 @@
   commons-crypto
 
 
+  com.google.crypto.tink
+  tink
+
+
   org.roaringbitmap
   RoaringBitmap
 
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
index 4428f0f..b555410 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
@@ -105,15 +105,15 @@ public class AuthClientBootstrap implements 
TransportClientBootstrap {
 
 String secretKey = secretKeyHolder.getSecretKey(appId);
 try (AuthEngine engine = new AuthEngine(appId, secretKey, conf)) {
-  ClientChallenge challenge = engine.challenge();
+  AuthMessage challenge = engine.challenge();
   ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
   challenge.encode(challengeData);
 
   ByteBuffer responseData =
   client.sendRpcSync(challengeData.nioBuffer(), 
conf.authRTTimeoutMs());
-  ServerResponse response = ServerResponse.decodeMessage(responseData);
+  AuthMessage response = AuthMessage.decodeMessage(responseData);
 
-  engine.validate(response);
+  engine.deriveSessionCipher(challenge, response);
   engine.sessionCipher().addToChannel(channel);
 }
   }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
index c2b2edc..aadf2b5 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthEngine.java
@@ -17,134 +17,216 @@
 
 package org.apache.spark.network.crypto;
 
+import javax.crypto.spec.SecretKeySpec;
 import java.io.Closeable;
-import java.io.IOException;
-import java.math.BigInteger;
 import java.security.GeneralSecurityException;
 import java.util.Arrays;
 import java.util.Properties;
-import javax.crypto.Cipher;
-import javax.crypto.SecretKey;
-import javax.crypto.SecretKeyFactory;
-import javax.crypto.ShortBufferException;
-import javax.crypto.spec.IvParameterSpec;
-import javax.crypto.spec.PBEKeySpec;
-import javax.crypto.spec.SecretKeySpec;
-import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Bytes;
-import org.apache.commons.crypto.cipher.CryptoCipher;
-import org.apache.commons.crypto.cipher.CryptoCipherFactory;
-import org.apache.commons.crypto.random.CryptoRandom;
-import org.apache.commons.crypto.random.CryptoRandomFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.google.crypto.tink.subtle.AesGcmJce;
+import com.google.crypto.tink.subtle.Hkdf;
+import com.google.crypto.tink.subtle.Random;
+import com.google.crypto.tink.subtle.X25519;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import static java.nio.charset.StandardCharsets.UTF_8;
 import org.apache.spark.network.util.TransportConf;
 
 /**
- * A helper class for abstracting authentication and key negotiation details. 
This is used by
- * both client and server sides, since the operations are basically the same.
+ * A helper class for abstracting au

[spark] branch master updated (9b9db5a -> f4b31c6)

2021-08-16 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 9b9db5a  [SPARK-36491][SQL] Make from_json/to_json to handle 
timestamp_ntz type properly
 add f4b31c6  [SPARK-36498][SQL] Reorder inner fields of the input query in 
byName V2 write

No new revisions were added by this update.

Summary of changes:
 .../catalyst/analysis/TableOutputResolver.scala| 170 +++--
 ...lysisSuite.scala => V2WriteAnalysisSuite.scala} |  96 
 2 files changed, 222 insertions(+), 44 deletions(-)
 rename 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/{DataSourceV2AnalysisSuite.scala
 => V2WriteAnalysisSuite.scala} (88%)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8b8d91c -> 9b9db5a)

2021-08-16 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 8b8d91c  [SPARK-36465][SS] Dynamic gap duration in session window
 add 9b9db5a  [SPARK-36491][SQL] Make from_json/to_json to handle 
timestamp_ntz type properly

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/catalyst/json/JacksonGenerator.scala  | 12 
 .../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 12 
 .../sql-tests/results/timestampNTZ/timestamp-ansi.sql.out  |  5 ++---
 .../sql-tests/results/timestampNTZ/timestamp.sql.out   |  5 ++---
 .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala| 14 +-
 5 files changed, 41 insertions(+), 7 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org