Hi,
I ran into an interesting scenario with no profile output today. I have
a PySpark application that primarily uses the Spark SQL APIs. I
understand that parts of the Spark SQL API may not generate data in the
PySpark profile dumps, but I was surprised when I had code containing a
UDF that did not generate any profile output. I had thought anytime I
used a UDF with Spark SQL that code would have to execute in a Python
interpreter on the executor. Is that not the case? This went against my
mental model for how this works in Spark, so I'm trying to understand
what is happening here to cause no profile output, which made me wonder
if the UDF had ran in the JVM.
I have created a github repo with this code in main.py and the example
code in ticket 3478 https://github.com/apache/spark/pull/2556 in
py_profile.py which does emit a profile dump.
https://github.com/AlexHagerman/pyspark-profiling
Thanks,
Alex
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import broadcast, udf
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.linalg import Vector, VectorUDT
if __name__ == '__main__':
spark = SparkSession.builder.appName("token_to_vec") \
.config("spark.python.profile", "true") \
.config("spark.python.profile.dump", "./main_dump/") \
.config("spark.rdd.compress", "true") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryoserializer", "64") \
.getOrCreate()
lines_df = spark.read.parquet("./data/token.parquet")
vecs = Word2VecModel.load('./data/word_vectors')
vecs_df = vecs.getVectors()
vecs_dict = vecs_df.collect()
vec_dict = spark.sparkContext.broadcast({wv[0]: wv[1] for wv in vecs_dict})
missing_vec = spark.sparkContext.broadcast(vec_dict.value['MISSING_TOKEN'])
token_to_vec = udf(lambda r: [vec_dict.value.get(w, missing_vec.value) for
w in r], ArrayType(VectorUDT()))
tdf = lines_df.withColumn("ln_vec", token_to_vec("text"))
tdf.write.mode("overwrite").parquet(path="./data/token_vecs.parquet",
mode="overwrite", compression="snappy")
spark.sparkContext.show_profiles()
spark.stop()