HyukjinKwon commented on code in PR #48843:
URL: https://github.com/apache/spark/pull/48843#discussion_r1858318060
##########
python/pyspark/util.py:
##########
@@ -406,13 +408,41 @@ def inner(*args: Any, **kwargs: Any) -> Any:
return outer
- # Non Spark Connect
+ # Non Spark Connect with SparkSession or Callable
+ from pyspark.sql import SparkSession
from pyspark import SparkContext
from py4j.clientserver import ClientServer
if isinstance(SparkContext._gateway, ClientServer):
# Here's when the pinned-thread mode (PYSPARK_PIN_THREAD) is on.
+ if isinstance(f, SparkSession):
+ session = f
+ assert session is not None
+ tags = set(session.getTags())
+ # Local properties are copied when wrapping the function.
+ assert SparkContext._active_spark_context is not None
+ properties =
SparkContext._active_spark_context._jsc.sc().getLocalProperties().clone()
+
+ def outer(ff: Callable) -> Callable:
+ @functools.wraps(ff)
+ def wrapped(*args: Any, **kwargs: Any) -> Any:
+ # Apply properties and tags in the child thread.
+ assert SparkContext._active_spark_context is not None
+
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
+ for tag in tags:
+ session.addTag(tag) # type: ignore[union-attr]
+ return ff(*args, **kwargs)
+
+ return wrapped
+
+ return outer
+
+ warnings.warn(
+ "Spark Connect session is not provided. Tags will not be
inherited.",
Review Comment:
```suggestion
"Spark session is not provided. Tags will not be inherited.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]