[spark] branch master updated: [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based shuffle

2023-01-11 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e26fa3fef27 [SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics 
for Push-based shuffle
e26fa3fef27 is described below

commit e26fa3fef273975c00a4668721a95bea1ba7f770
Author: Minchu Yang 
AuthorDate: Thu Jan 12 00:53:16 2023 -0600

[SPARK-33573][SHUFFLE][YARN] Shuffle server side metrics for Push-based 
shuffle

### What changes were proposed in this pull request?
This is one of the patches for SPARK-33235: Push-based Shuffle Improvement 
Tasks.
Added a class `PushMergeMetrics`, to collect below metrics from shuffle 
server side for Push-based shuffle:
- no opportunity responses
- too late responses
- pushed bytes written
- deferred block bytes
- number of deferred blocks
- stale block push requests
- ignored block bytes
### Why are the changes needed?
This helps to understand the push based shuffle metrics from shuffle server 
side.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added a method `verifyMetrics` to verify those metrics in existing unit 
tests.

Closes #37638 from rmcyang/SPARK-33573-1.

Lead-authored-by: Minchu Yang 
Co-authored-by: Minchu Yang 
Signed-off-by: Mridul gmail.com>
---
 .../network/shuffle/MergedShuffleFileManager.java  |  13 +++
 .../network/shuffle/RemoteBlockPushResolver.java   | 124 +++--
 .../shuffle/RemoteBlockPushResolverSuite.java  |  58 +-
 .../spark/network/yarn/YarnShuffleService.java |   5 +
 docs/monitoring.md |  15 +++
 5 files changed, 203 insertions(+), 12 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
index 051684a92d0..7176b30ba08 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
@@ -18,6 +18,9 @@
 package org.apache.spark.network.shuffle;
 
 import java.io.IOException;
+import java.util.Collections;
+
+import com.codahale.metrics.MetricSet;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.network.buffer.ManagedBuffer;
@@ -126,4 +129,14 @@ public interface MergedShuffleFileManager {
* leveldb for state persistence.
*/
   default void close() {}
+
+  /**
+   * Get the metrics associated with the MergedShuffleFileManager. E.g., this 
is used to collect
+   * the push merged metrics within RemoteBlockPushResolver.
+   *
+   * @return the map contains the metrics
+   */
+  default MetricSet getMetrics() {
+return Collections::emptyMap;
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index c3a2e9a883a..6a65e6ccfab 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -29,6 +29,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,10 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricSet;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
@@ -133,6 +138,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   @SuppressWarnings("UnstableApiUsage")
   private final LoadingCache indexCache;
 
+  private final PushMergeMetrics pushMergeMetrics;
+
   @VisibleForTesting
   final File recoveryFile;
 
@@ -171,6 +178,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 dbBackend, Constants.SHUFFLE_SERVICE_DB_BACKEND);
   reloadAndCleanUpAppShuffleInfo(db);
 }
+this.pushMergeMetrics = new PushMergeMetrics();
   }
 
   @VisibleForTesting
@@ -504,6 +512,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 

[spark] branch master updated (34dad9582c4 -> 85b979e04ca)

2023-01-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 34dad9582c4 [SPARK-41887][CONNECT][TESTS][FOLLOW-UP] Enable 
test_extended_hint_types test case
 add 85b979e04ca [SPARK-41998][CONNECT][TESTS] Reuse 
pyspark.sql.tests.test_readwriter test cases

No new revisions were added by this update.

Summary of changes:
 dev/sparktestsupport/modules.py|  1 +
 .../sql/tests/connect/test_parity_readwriter.py| 67 ++
 python/pyspark/sql/tests/test_readwriter.py| 12 +++-
 3 files changed, 78 insertions(+), 2 deletions(-)
 create mode 100644 python/pyspark/sql/tests/connect/test_parity_readwriter.py


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (55f64cfa839 -> 34dad9582c4)

2023-01-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 55f64cfa839 [SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to  
`ProtobufSerDe[T]`
 add 34dad9582c4 [SPARK-41887][CONNECT][TESTS][FOLLOW-UP] Enable 
test_extended_hint_types test case

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/connect/test_parity_dataframe.py |  4 
 python/pyspark/sql/tests/test_dataframe.py| 11 +++
 2 files changed, 7 insertions(+), 8 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to `ProtobufSerDe[T]`

2023-01-11 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 55f64cfa839 [SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to  
`ProtobufSerDe[T]`
55f64cfa839 is described below

commit 55f64cfa839a77b8fff7c1625281b84cce4c6807
Author: yangjie01 
AuthorDate: Wed Jan 11 22:21:21 2023 -0800

[SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to  `ProtobufSerDe[T]`

### What changes were proposed in this pull request?
This pr aims refator `ProtobufSerDe` to  `ProtobufSerDe[T]`, the main 
change of `ProtobufSerDe` as follows:

- Change the definition of `ProtobufSerDe` to `ProtobufSerDe[T]`
- Remove `supportClass` method from `ProtobufSerDe[T]` and use reflection 
in `KVStoreProtobufSerializer` to obtain the actual type of `T` as 
`serializerMap` key
- Change the input parameter type of `serialize` function from `Any` to `T`
- Change the return value type of `deserialize` function method from `Any` 
to `T`

Then, all the subclasses of `ProtobufSerDe[T]` are refactored and code 
cleaned in this pr.

### Why are the changes needed?
Refactor `ProtobufSerDe` and code cleanup.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions

Closes #39487 from LuciferYang/refactor-ProtobufSerDe.

Authored-by: yangjie01 
Signed-off-by: Gengliang Wang 
---
 .../ApplicationEnvironmentInfoWrapperSerializer.scala | 10 +++---
 .../protobuf/ApplicationInfoWrapperSerializer.scala   |  9 ++---
 .../status/protobuf/CachedQuantileSerializer.scala|  6 ++
 .../ExecutorStageSummaryWrapperSerializer.scala   | 10 +++---
 .../protobuf/ExecutorSummaryWrapperSerializer.scala   | 10 ++
 .../status/protobuf/JobDataWrapperSerializer.scala|  9 ++---
 .../status/protobuf/KVStoreProtobufSerializer.scala   | 15 +++
 .../protobuf/ProcessSummaryWrapperSerializer.scala| 10 ++
 .../apache/spark/status/protobuf/ProtobufSerDe.scala  | 19 +--
 .../protobuf/RDDOperationGraphWrapperSerializer.scala |  7 ++-
 .../protobuf/RDDStorageInfoWrapperSerializer.scala|  9 ++---
 .../protobuf/ResourceProfileWrapperSerializer.scala   |  9 ++---
 .../SpeculationStageSummaryWrapperSerializer.scala| 10 +++---
 .../status/protobuf/StageDataWrapperSerializer.scala  | 13 +
 .../status/protobuf/StreamBlockDataSerializer.scala   |  6 ++
 .../status/protobuf/TaskDataWrapperSerializer.scala   |  9 ++---
 .../protobuf/sql/SQLExecutionUIDataSerializer.scala   |  7 ++-
 .../sql/SparkPlanGraphWrapperSerializer.scala |  7 ++-
 .../protobuf/sql/StreamingQueryDataSerializer.scala   |  9 +++--
 19 files changed, 57 insertions(+), 127 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index 33a18daacbc..b7cf01382e2 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -23,14 +23,10 @@ import org.apache.spark.resource.{ExecutorResourceRequest, 
TaskResourceRequest}
 import org.apache.spark.status.ApplicationEnvironmentInfoWrapper
 import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, 
ResourceProfileInfo, RuntimeInfo}
 
-class ApplicationEnvironmentInfoWrapperSerializer extends ProtobufSerDe {
+class ApplicationEnvironmentInfoWrapperSerializer
+  extends ProtobufSerDe[ApplicationEnvironmentInfoWrapper] {
 
-  override val supportClass: Class[_] = 
classOf[ApplicationEnvironmentInfoWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-serialize(input.asInstanceOf[ApplicationEnvironmentInfoWrapper])
-
-  private def serialize(input: ApplicationEnvironmentInfoWrapper): Array[Byte] 
= {
+  override def serialize(input: ApplicationEnvironmentInfoWrapper): 
Array[Byte] = {
 val builder = StoreTypes.ApplicationEnvironmentInfoWrapper.newBuilder()
 builder.setInfo(serializeApplicationEnvironmentInfo(input.info))
 builder.build().toByteArray
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
index 5a2accb7506..c56b5302cc1 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
@@ -26,14 +26,9 @@ import 

[spark] branch master updated: [SPARK-41996][SQL][SS] Fix kafka test to verify lost partitions to account for slow Kafka operations

2023-01-11 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d7a5fdf1dd [SPARK-41996][SQL][SS] Fix kafka test to verify lost 
partitions to account for slow Kafka operations
3d7a5fdf1dd is described below

commit 3d7a5fdf1ddf1e9748b568d66ab366f3c0ff5e55
Author: Anish Shrigondekar 
AuthorDate: Thu Jan 12 12:49:57 2023 +0900

[SPARK-41996][SQL][SS] Fix kafka test to verify lost partitions to account 
for slow Kafka operations

### What changes were proposed in this pull request?
Fix kafka test to verify lost partitions to account for slow Kafka 
operations

Basically its possible that kafka operations around topic deletion, 
partition creation etc can exceed the streaming query timeout thereby failing 
the query and test incorrectly. This change updates the exit timeout.

### Why are the changes needed?
Change is required to avoid test flakiness in the event of Kafka operations 
becoming slower

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test only change
Reran the tests multiple times:
```
[info] Assembly jar up to date: 
/Users/anish.shrigondekar/spark/spark/connector/protobuf/target/scala-2.12/spark-protobuf-assembly-3.4.0-SNAPSHOT.jar
[info] - Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable during subsequent batches (6 seconds, 440 
milliseconds)
[info] - Query with Trigger.AvailableNow should throw error when offset(s) 
in planned topic partitions got unavailable during subsequent batches (6 
seconds, 331 milliseconds)
[info] Run completed in 17 seconds, 269 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 52 s, completed Jan 11, 2023, 6:05:24
```

Closes #39520 from anishshri-db/SPARK-41996.

Authored-by: Anish Shrigondekar 
Signed-off-by: Jungtaek Lim 
---
 .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 18d63a9a4ef..d63b9805e55 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -358,10 +358,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
 .start()
 }
 
+// SPARK-41996 - Increase query termination timeout to ensure that
+// Kafka operations can be completed
+val queryTimeout = 300.seconds
 val exc = intercept[Exception] {
   val query = startTriggerAvailableNowQuery()
   try {
-assert(query.awaitTermination(streamingTimeout.toMillis))
+assert(query.awaitTermination(queryTimeout.toMillis))
   } finally {
 query.stop()
   }
@@ -409,10 +412,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
 .start()
 }
 
+// SPARK-41996 - Increase query termination timeout to ensure that
+// Kafka operations can be completed
+val queryTimeout = 300.seconds
 val exc = intercept[StreamingQueryException] {
   val query = startTriggerAvailableNowQuery()
   try {
-assert(query.awaitTermination(streamingTimeout.toMillis))
+assert(query.awaitTermination(queryTimeout.toMillis))
   } finally {
 query.stop()
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (47afefb6679 -> 63479a235a1)

2023-01-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 47afefb6679 [SPARK-41980][CONNECT][TESTS] Enable 
test_functions_broadcast in functions parity test
 add 63479a235a1 [SPARK-41635][SQL] Fix group by all error reporting

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/analysis/ResolveGroupByAll.scala  |  5 +--
 .../resources/sql-tests/inputs/group-by-all.sql| 11 +++---
 .../sql-tests/results/group-by-all.sql.out | 40 +-
 3 files changed, 42 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41980][CONNECT][TESTS] Enable test_functions_broadcast in functions parity test

2023-01-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 47afefb6679 [SPARK-41980][CONNECT][TESTS] Enable 
test_functions_broadcast in functions parity test
47afefb6679 is described below

commit 47afefb66794df16399e34791a68bdec9885778e
Author: Hyukjin Kwon 
AuthorDate: Thu Jan 12 10:31:40 2023 +0900

[SPARK-41980][CONNECT][TESTS] Enable test_functions_broadcast in functions 
parity test

### What changes were proposed in this pull request?

This PR enables `test_functions_broadcast` back by avoiding `_jdf` access 
in the original test.

### Why are the changes needed?

For test coverage and feature parity.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Fixed unittests.

Closes #39500 from HyukjinKwon/SPARK-41980.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/connect/test_parity_functions.py |  6 --
 python/pyspark/sql/tests/test_functions.py| 14 +-
 2 files changed, 9 insertions(+), 11 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_functions.py 
b/python/pyspark/sql/tests/connect/test_parity_functions.py
index 2f6ed05559f..dd7229d158f 100644
--- a/python/pyspark/sql/tests/connect/test_parity_functions.py
+++ b/python/pyspark/sql/tests/connect/test_parity_functions.py
@@ -40,12 +40,6 @@ class FunctionsParityTests(FunctionsTestsMixin, 
ReusedConnectTestCase):
 def test_function_parity(self):
 super().test_function_parity()
 
-@unittest.skip(
-"Spark Connect does not support Spark Context, _jdf but the test 
depends on that."
-)
-def test_functions_broadcast(self):
-super().test_functions_broadcast()
-
 @unittest.skip("Spark Connect does not support Spark Context but the test 
depends on that.")
 def test_input_file_name_reset_for_rdd(self):
 super().test_input_file_name_reset_for_rdd()
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index 4db1eed1eb1..38a4e3e6644 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -16,6 +16,8 @@
 #
 
 import datetime
+import io
+from contextlib import redirect_stdout
 from inspect import getmembers, isfunction
 from itertools import chain
 import re
@@ -452,15 +454,17 @@ class FunctionsTestsMixin:
 df2 = self.spark.createDataFrame([(1, "1"), (2, "2")], ("key", 
"value"))
 
 # equijoin - should be converted into broadcast join
-plan1 = df1.join(broadcast(df2), 
"key")._jdf.queryExecution().executedPlan()
-self.assertEqual(1, plan1.toString().count("BroadcastHashJoin"))
+with io.StringIO() as buf, redirect_stdout(buf):
+df1.join(broadcast(df2), "key").explain(True)
+self.assertGreaterEqual(buf.getvalue().count("Broadcast"), 1)
 
 # no join key -- should not be a broadcast join
-plan2 = 
df1.crossJoin(broadcast(df2))._jdf.queryExecution().executedPlan()
-self.assertEqual(0, plan2.toString().count("BroadcastHashJoin"))
+with io.StringIO() as buf, redirect_stdout(buf):
+df1.crossJoin(broadcast(df2)).explain(True)
+self.assertGreaterEqual(buf.getvalue().count("Broadcast"), 1)
 
 # planner should not crash without a join
-broadcast(df1)._jdf.queryExecution().executedPlan()
+broadcast(df1).explain(True)
 
 def test_first_last_ignorenulls(self):
 from pyspark.sql import functions


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41591][PYTHON][ML] Training PyTorch Files on Single Node Multi GPU

2023-01-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b800ba8461 [SPARK-41591][PYTHON][ML] Training PyTorch Files on Single 
Node Multi GPU
6b800ba8461 is described below

commit 6b800ba8461935a205d8c15eba2ff11f141dea47
Author: Rithwik Ediga Lakhamsani 
AuthorDate: Thu Jan 12 08:42:01 2023 +0900

[SPARK-41591][PYTHON][ML] Training PyTorch Files on Single Node Multi GPU

### What changes were proposed in this pull request?

This is an addition to https://github.com/apache/spark/pull/39146 to add 
support for single node training using PyTorch files. The users would follow 
the second workflow in the [design 
document](https://docs.google.com/document/d/1QPO1Ly8WteL6aIPvVcR7Xne9qVtJiB3fdrRn7NwBcpA/edit#heading=h.8yvw9xq428fh)
 to run training. I added some new utility functions as well as built on top of 
current functions.

### Why are the changes needed?

Look at the [main 
ticket](https://issues.apache.org/jira/browse/SPARK-41589) for more details.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Some unit tests were added and integration tests will be added in a later 
PR (https://issues.apache.org/jira/browse/SPARK-41777).

Closes #39188 from rithwik-db/pytorch-file-local-training.

Authored-by: Rithwik Ediga Lakhamsani 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/ml/torch/distributor.py | 186 -
 python/pyspark/ml/torch/tests/test_distributor.py  | 147 +++-
 .../pyspark/ml/torch/torch_run_process_wrapper.py  |  83 +
 3 files changed, 412 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/ml/torch/distributor.py 
b/python/pyspark/ml/torch/distributor.py
index 2a4027cbb25..80d5ad31c3c 100644
--- a/python/pyspark/ml/torch/distributor.py
+++ b/python/pyspark/ml/torch/distributor.py
@@ -15,7 +15,16 @@
 # limitations under the License.
 #
 
+import collections
+import ctypes
 import math
+import os
+import random
+import re
+import signal
+import sys
+import subprocess
+import time
 from typing import Union, Callable, Optional, Any
 import warnings
 
@@ -34,8 +43,8 @@ def get_conf_boolean(sc: SparkContext, key: str, 
default_value: str) -> bool:
 
 Parameters
 --
-sc : SparkContext
-The SparkContext for the distributor.
+sc : :class:`SparkContext`
+The :class:`SparkContext` for the distributor.
 key : str
 string for conf name
 default_value : str
@@ -64,6 +73,42 @@ def get_conf_boolean(sc: SparkContext, key: str, 
default_value: str) -> bool:
 )
 
 
+def get_gpus_owned(sc: SparkContext) -> list[str]:
+"""Gets the number of GPUs that Spark scheduled to the calling task.
+
+Parameters
+--
+sc : :class:`SparkContext`
+The :class:`SparkContext` that has GPUs available.
+
+Returns
+---
+list
+The correct mapping of addresses to workers.
+
+Raises
+--
+ValueError
+Raised if the input addresses were not found.
+"""
+CUDA_VISIBLE_DEVICES = "CUDA_VISIBLE_DEVICES"
+pattern = re.compile("^[1-9][0-9]*|0$")
+addresses = sc.resources["gpu"].addresses
+if any(not pattern.match(address) for address in addresses):
+raise ValueError(
+f"Found GPU addresses {addresses} which "
+"are not all in the correct format "
+"for CUDA_VISIBLE_DEVICES, which requires "
+"integers with no zero padding."
+)
+if CUDA_VISIBLE_DEVICES in os.environ:
+gpu_indices = list(map(int, addresses))
+gpu_list = os.environ[CUDA_VISIBLE_DEVICES].split(",")
+gpu_owned = [gpu_list[i] for i in gpu_indices]
+return gpu_owned
+return addresses
+
+
 class Distributor:
 """
 The parent class for TorchDistributor. This class shouldn't be 
instantiated directly.
@@ -85,6 +130,12 @@ class Distributor:
 self.num_tasks = self._get_num_tasks()
 self.ssl_conf = None
 
+def _create_input_params(self) -> dict[str, Any]:
+input_params = self.__dict__.copy()
+for unneeded_param in ["spark", "sc", "ssl_conf"]:
+del input_params[unneeded_param]
+return input_params
+
 def _get_num_tasks(self) -> int:
 """
 Returns the number of Spark tasks to use for distributed training
@@ -261,6 +312,130 @@ class TorchDistributor(Distributor):
 super().__init__(num_processes, local_mode, use_gpu)
 self.ssl_conf = "pytorch.spark.distributor.ignoreSsl"  # type: ignore
 self._validate_input_params()
+self.input_params = self._create_input_params()
+
+@staticmethod
+def _create_torchrun_command(
+input_params: 

[spark] branch master updated: [SPARK-41047][SQL] Improve docs for round

2023-01-11 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 073b23ef2a9 [SPARK-41047][SQL] Improve docs for round
073b23ef2a9 is described below

commit 073b23ef2a982370e0ff8836d55361bca3320e37
Author: panbingkun 
AuthorDate: Wed Jan 11 16:39:25 2023 -0600

[SPARK-41047][SQL] Improve docs for round

### What changes were proposed in this pull request?
The pr aims to improve docs for round.

### Why are the changes needed?
Reduce user misunderstanding.
It is not necessary to enumerate the usage of legacy in the example.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.
Manual check.

Closes #39511 from panbingkun/SPARK-41047.

Authored-by: panbingkun 
Signed-off-by: Sean Owen 
---
 .../org/apache/spark/sql/catalyst/expressions/mathExpressions.scala | 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
index 16f081a0cc2..9ffc148180a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
@@ -1644,8 +1644,6 @@ abstract class RoundBase(child: Expression, scale: 
Expression,
 Examples:
   > SELECT _FUNC_(2.5, 0);
3
-  > SELECT _FUNC_(25, -1);
-   30
   """,
   since = "1.5.0",
   group = "math_funcs")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][SQL][YARN] Fix a typo: less then -> less than

2023-01-11 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c7ae0657f93 [MINOR][SQL][YARN] Fix a typo: less then -> less than
c7ae0657f93 is described below

commit c7ae0657f93639dfe1b2996d94f5cabd16adc65d
Author: Yuming Wang 
AuthorDate: Wed Jan 11 11:20:33 2023 -0600

[MINOR][SQL][YARN] Fix a typo: less then -> less than

### What changes were proposed in this pull request?

Fix a typo: less then -> less than.

### Why are the changes needed?

Fix typo.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

N/A.

Closes #39513 from wangyum/typo.

Authored-by: Yuming Wang 
Signed-off-by: Sean Owen 
---
 .../src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala   | 2 +-
 .../main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala  | 4 ++--
 .../org/apache/spark/sql/execution/BaseScriptTransformationExec.scala | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index da4dd0cbb6b..313b19f919d 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -705,7 +705,7 @@ private[yarn] class YarnAllocator(
   containersToUse: ArrayBuffer[Container],
   remaining: ArrayBuffer[Container]): Unit = {
 // Match on the exact resource we requested so there shouldn't be a 
mismatch,
-// we are relying on YARN to return a container with resources no less 
then we requested.
+// we are relying on YARN to return a container with resources no less 
than we requested.
 // If we change this, or starting validating the container, be sure the 
logic covers SPARK-6050.
 val rpId = getResourceProfileIdFromPriority(allocatedContainer.getPriority)
 val resourceForRP = rpIdToYarnResource.get(rpId)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 960d7b4599b..eb85aee25ce 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1707,9 +1707,9 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
* - Null-safe Equal: '<=>'
* - Not Equal: '<>' or '!='
* - Less than: '<'
-   * - Less then or Equal: '<='
+   * - Less than or Equal: '<='
* - Greater than: '>'
-   * - Greater then or Equal: '>='
+   * - Greater than or Equal: '>='
*/
   override def visitComparison(ctx: ComparisonContext): Expression = 
withOrigin(ctx) {
 val left = expression(ctx.left)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
index bfc2bc7cd11..99d59901d58 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala
@@ -123,8 +123,8 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
   .map { case (data, writer) => writer(data) })
   } else {
 // In schema less mode, hive will choose first two output column as 
output.
-// If output column size less then 2, it will return NULL for columns 
with missing values.
-// Here we split row string and choose first 2 values, if values's 
size less then 2,
+// If output column size less than 2, it will return NULL for columns 
with missing values.
+// Here we split row string and choose first 2 values, if values's 
size less than 2,
 // we pad NULL value until 2 to make behavior same with hive.
 val kvWriter = 
CatalystTypeConverters.createToCatalystConverter(StringType)
 prevLine: String =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41977][SPARK-41978][CONNECT] SparkSession.range to take float as arguments

2023-01-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e821d84409f [SPARK-41977][SPARK-41978][CONNECT] SparkSession.range to 
take float as arguments
e821d84409f is described below

commit e821d84409f00e03f9469c9e8e7040e9cc5a5d9f
Author: Hyukjin Kwon 
AuthorDate: Thu Jan 12 00:23:51 2023 +0900

[SPARK-41977][SPARK-41978][CONNECT] SparkSession.range to take float as 
arguments

### What changes were proposed in this pull request?

This PR proposes to Spark Connect's `SparkSession.range` to accept floats. 
e.g., `spark.range(10e10)`.

### Why are the changes needed?

For feature parity.

### Does this PR introduce _any_ user-facing change?

No to end users since Spark Connect has not been released yet.
`SparkSession.range` allows floats.

### How was this patch tested?

Unittests enabled back.

Closes #39499 from HyukjinKwon/SPARK-41977.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/session.py   |  8 +++-
 .../pyspark/sql/tests/connect/test_parity_dataframe.py  |  4 
 python/pyspark/sql/tests/test_dataframe.py  | 17 +
 3 files changed, 16 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index 4c5ea3da10e..618608d64f7 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -333,8 +333,14 @@ class SparkSession:
 else:
 actual_end = end
 
+if numPartitions is not None:
+numPartitions = int(numPartitions)
+
 return DataFrame.withPlan(
-Range(start=start, end=actual_end, step=step, 
num_partitions=numPartitions), self
+Range(
+start=int(start), end=int(actual_end), step=int(step), 
num_partitions=numPartitions
+),
+self,
 )
 
 range.__doc__ = PySparkSession.range.__doc__
diff --git a/python/pyspark/sql/tests/connect/test_parity_dataframe.py 
b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
index 32fb6216ba9..5c3e4ee1a01 100644
--- a/python/pyspark/sql/tests/connect/test_parity_dataframe.py
+++ b/python/pyspark/sql/tests/connect/test_parity_dataframe.py
@@ -61,10 +61,6 @@ class DataFrameParityTests(DataFrameTestsMixin, 
ReusedConnectTestCase):
 def test_extended_hint_types(self):
 super().test_extended_hint_types()
 
-@unittest.skip("Spark Connect does not support JVM function _jdf but the 
tests depend on them")
-def test_generic_hints(self):
-super().test_generic_hints()
-
 @unittest.skip("Spark Connect does not support RDD but the tests depend on 
them.")
 def test_help_command(self):
 super().test_help_command()
diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 2a82b0ab90d..e83ecbf2e6e 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -23,6 +23,8 @@ import tempfile
 import time
 import unittest
 from typing import cast
+import io
+from contextlib import redirect_stdout
 
 from pyspark.sql import SparkSession, Row
 from pyspark.sql.functions import col, lit, count, sum, mean, struct
@@ -534,20 +536,17 @@ class DataFrameTestsMixin:
 self.assertRaises(Exception, self.df.withColumns)
 
 def test_generic_hints(self):
-from pyspark.sql import DataFrame
-
 df1 = self.spark.range(10e10).toDF("id")
 df2 = self.spark.range(10e10).toDF("id")
 
-self.assertIsInstance(df1.hint("broadcast"), DataFrame)
-self.assertIsInstance(df1.hint("broadcast", []), DataFrame)
+self.assertIsInstance(df1.hint("broadcast"), type(df1))
 
 # Dummy rules
-self.assertIsInstance(df1.hint("broadcast", "foo", "bar"), DataFrame)
-self.assertIsInstance(df1.hint("broadcast", ["foo", "bar"]), DataFrame)
+self.assertIsInstance(df1.hint("broadcast", "foo", "bar"), type(df1))
 
-plan = df1.join(df2.hint("broadcast"), 
"id")._jdf.queryExecution().executedPlan()
-self.assertEqual(1, plan.toString().count("BroadcastHashJoin"))
+with io.StringIO() as buf, redirect_stdout(buf):
+df1.join(df2.hint("broadcast"), "id").explain(True)
+self.assertEqual(1, buf.getvalue().count("BroadcastHashJoin"))
 
 # add tests for SPARK-23647 (test more types for hint)
 def test_extended_hint_types(self):
@@ -556,6 +555,8 @@ class DataFrameTestsMixin:
 hinted_df = df.hint("my awesome hint", 1.2345, "what", 
such_a_nice_list)
 logical_plan = hinted_df._jdf.queryExecution().logical()
 
+self.assertIsInstance(df.hint("broadcast",