holdenk commented on code in PR #52679:
URL: https://github.com/apache/spark/pull/52679#discussion_r2482828853
##########
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##########
@@ -38,7 +38,9 @@ import
org.apache.spark.internal.config.Python.PYTHON_FACTORY_IDLE_WORKER_MAX_PO
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util.{RedirectThread, Utils}
-case class PythonWorker(channel: SocketChannel) {
+case class PythonWorker(
+ channel: SocketChannel,
+ extraChannel: Option[SocketChannel] = None) {
Review Comment:
Let's call this profile data channel or something?
##########
python/pyspark/worker.py:
##########
@@ -3167,7 +3170,28 @@ def func(_, it):
return func, None, ser, ser
+def write_profile(outfile):
+ import yappi
+
+ while True:
+ stats = []
+ for thread in yappi.get_thread_stats():
+ data = list(yappi.get_func_stats(ctx_id=thread.id))
+ stats.extend([{str(k): str(v) for k, v in d.items()} for d in
data])
+ pickled = pickle.dumps(stats)
Review Comment:
While pickle? Would JSON maybe make more sense so we can interpert more
easily in say the Spark UI in the future?
##########
python/pyspark/worker.py:
##########
@@ -3167,7 +3170,28 @@ def func(_, it):
return func, None, ser, ser
+def write_profile(outfile):
+ import yappi
+
+ while True:
+ stats = []
+ for thread in yappi.get_thread_stats():
+ data = list(yappi.get_func_stats(ctx_id=thread.id))
+ stats.extend([{str(k): str(v) for k, v in d.items()} for d in
data])
+ pickled = pickle.dumps(stats)
+ write_with_length(pickled, outfile)
+ outfile.flush()
+ time.sleep(1)
+
+
def main(infile, outfile):
Review Comment:
Maybe rename to outputs
##########
python/pyspark/worker.py:
##########
@@ -3167,7 +3170,28 @@ def func(_, it):
return func, None, ser, ser
+def write_profile(outfile):
+ import yappi
+
+ while True:
+ stats = []
+ for thread in yappi.get_thread_stats():
+ data = list(yappi.get_func_stats(ctx_id=thread.id))
+ stats.extend([{str(k): str(v) for k, v in d.items()} for d in
data])
+ pickled = pickle.dumps(stats)
+ write_with_length(pickled, outfile)
+ outfile.flush()
+ time.sleep(1)
Review Comment:
I know yappi says it's fast but 1 second busy loop seems maybe overkill or
should be configurable?
##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -281,6 +281,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
// allow the user to set the batch size for the BatchedSerializer on UDFs
envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
+ envVars.put("PYSPARK_RUNTIME_PROFILE", true.toString)
Review Comment:
obviously make configurable later
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]