eddyxu edited a comment on pull request #31735:
URL: https://github.com/apache/spark/pull/31735#issuecomment-795810989


   @HyukjinKwon this PR has not yet supported passing UDT into pandas UDF as 
input parameters yet.  Not sure what severity of `the performance was pretty 
bad.` is though. 
   
   Btw, I did remove the `typing dispatch` approach. In this PR, other than 
`applying the deconstruction of a UDT` from 
f488fb6f95fc62225d12b7895ca4a17081a7de2f6c4dc77272c7bd6694649706R166-R170, 
there is no per-row based detection for UDT.  
   
   ```python
           df = self.spark.range(1, 10**6, numPartitions=32)
           df = df.cache()
           df.rdd.count()
   
           @pandas_udf(ArrayType(ExampleBoxUDT()))
           def array_of_boxes(series: pd.Series) -> pd.Series:
               boxes = []
               for _, i in series.items():
                   boxes.append([ExampleBox(*([i] * 4)), ExampleBox(*([i + 1] * 
4))])
               return pd.Series(boxes)
   
           @pandas_udf(ArrayType(ArrayType(FloatType())))
           def array_of_arrays(series: pd.Series) -> pd.Series:
               boxes = []
               for _, i in series.items():
                   boxes.append([[i] * 4, [i + 1] * 4])
               return pd.Series(boxes)
   
           @udf(ArrayType(ExampleBoxUDT()))
           def plain_udf(i):
               return [ExampleBox(*([i] * 4)), ExampleBox(*([i + 1] * 4))]
   
           @pandas_udf(IntegerType())
           def vectorized(series: pd.Series) -> pd.Series:
               return series + 1
   
           import time
   
           start = time.time()
           df.withColumn("vectorized", vectorized(df.id)).rdd.count()
           print(f"Vectorized Pandas UDF: {time.time() - start}")
   
           start = time.time()
           df.withColumn("boxes", array_of_boxes(df.id)).rdd.count()
           print(f"Pandas UDF + UDT: {time.time() - start}")
   
           start = time.time()
           df.withColumn("b", array_of_arrays(df.id)).rdd.count()
           print(f"Pandas UDF + No UDT: {time.time() - start}")
   
           start = time.time()
           df.withColumn("plain_udf", plain_udf(df.id)).rdd.count()
           print(f"Normal UDF: {time.time() - start}")
   
   ```
   
   It seems no matter what UDF / Pandas UDF to run, the first one after 
`spark.range()` is the slowest, so I measure all of the benchmarks directly 
after `spark.range` for fairness. Also the previous benchmark seemed messed up 
with `.cache()` and input size. Here is the updated results from the above code.
   
   
   UDF          |  Time
   ------------ | -------------
   Pandas UDF + UDT     |  20.1209s
   Pandas UDF + ArrayType  |    4.1171s
   Normal UDF + UDT | 20.790s
   Pandas UDF (vectorized) |  2.2172s
   
   It should be understandable that using UDT with `@pandas_udf` means that 
pyspark can not use vectorized computation from pandas. And the generated 
`pd.Series`, which is the columnar format in memory already, can not directly 
to construct a pyarrow array, because we need to deconstruct UDT to its 
`sqlType` form. 
   
   However, as demonstrated above, generating 2M UDTs takes only 20s on single 
machine. This is the worst case scenario for the performance comparison because 
there is almost no other overhead than UDT deconstruction.  
   
   But in our real-world use cases that:
   
   1. We would like to use Pandas UDF to run heavy deep learning model 
inference (i.e., tensorflow or pytorch) so that we can amortize model 
initialization overhead, as well as being able to feed model in batch.  Also, 
2M boxes could takes hours to generate running on on tens of thousands of rows. 
   2. UDT implementation has domain-specific code that can not be baked in a 
`StructType`. Which means today, when we need to use such code, we need to glue 
a "StructType -> UDT" udf after a `pandas UDF`, which complicates the codebase 
and not necessarily  faster than this PR (basically "row 2 + 3 vs row 1" in the 
above table).
   
   If we use UDT in pandas UDF with cautions, it does deliver many benefits for 
our use cases.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to