holdenk commented on code in PR #52679:
URL: https://github.com/apache/spark/pull/52679#discussion_r2482828853


##########
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##########
@@ -38,7 +38,9 @@ import 
org.apache.spark.internal.config.Python.PYTHON_FACTORY_IDLE_WORKER_MAX_PO
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util.{RedirectThread, Utils}
 
-case class PythonWorker(channel: SocketChannel) {
+case class PythonWorker(
+    channel: SocketChannel,
+    extraChannel: Option[SocketChannel] = None) {

Review Comment:
   Let's call this profile data channel or something?



##########
python/pyspark/worker.py:
##########
@@ -3167,7 +3170,28 @@ def func(_, it):
     return func, None, ser, ser
 
 
+def write_profile(outfile):
+    import yappi
+
+    while True:
+        stats = []
+        for thread in yappi.get_thread_stats():
+            data = list(yappi.get_func_stats(ctx_id=thread.id))
+            stats.extend([{str(k): str(v) for k, v in d.items()} for d in 
data])
+        pickled = pickle.dumps(stats)

Review Comment:
   While pickle? Would JSON maybe make more sense so we can interpert more 
easily in say the Spark UI in the future?



##########
python/pyspark/worker.py:
##########
@@ -3167,7 +3170,28 @@ def func(_, it):
     return func, None, ser, ser
 
 
+def write_profile(outfile):
+    import yappi
+
+    while True:
+        stats = []
+        for thread in yappi.get_thread_stats():
+            data = list(yappi.get_func_stats(ctx_id=thread.id))
+            stats.extend([{str(k): str(v) for k, v in d.items()} for d in 
data])
+        pickled = pickle.dumps(stats)
+        write_with_length(pickled, outfile)
+        outfile.flush()
+        time.sleep(1)
+
+
 def main(infile, outfile):

Review Comment:
   Maybe rename to outputs



##########
python/pyspark/worker.py:
##########
@@ -3167,7 +3170,28 @@ def func(_, it):
     return func, None, ser, ser
 
 
+def write_profile(outfile):
+    import yappi
+
+    while True:
+        stats = []
+        for thread in yappi.get_thread_stats():
+            data = list(yappi.get_func_stats(ctx_id=thread.id))
+            stats.extend([{str(k): str(v) for k, v in d.items()} for d in 
data])
+        pickled = pickle.dumps(stats)
+        write_with_length(pickled, outfile)
+        outfile.flush()
+        time.sleep(1)

Review Comment:
   I know yappi says it's fast but 1 second busy loop seems maybe overkill or 
should be configurable?



##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -281,6 +281,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     }
     // allow the user to set the batch size for the BatchedSerializer on UDFs
     envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
+    envVars.put("PYSPARK_RUNTIME_PROFILE", true.toString)

Review Comment:
   obviously make configurable later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to