WeichenXu123 opened a new pull request #24643: [SPARK-26412][PySpark][SQL][WIP] Allow Pandas UDF to take an iterator of pd.DataFrames URL: https://github.com/apache/spark/pull/24643 ## What changes were proposed in this pull request? Allow Pandas UDF to take an iterator of pd.DataFrames I haven't add unit tests, but manually tests show it works fine. So it is ready for first pass review. We can test several typical cases: ``` from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.functions import udf from pyspark.taskcontext import TaskContext df = spark.createDataFrame([(1, 20), (3, 40)], ["a", "b"]) @pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi1(it): pid = TaskContext.get().partitionId() print("DBG: fi1: do init stuff, partitionId=" + str(pid)) for batch in it: yield batch + 100 print("DBG: fi1: do close stuff, partitionId=" + str(pid)) @pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi2(it): pid = TaskContext.get().partitionId() print("DBG: fi2: do init stuff, partitionId=" + str(pid)) for batch in it: yield batch + 10000 print("DBG: fi2: do close stuff, partitionId=" + str(pid)) @pandas_udf("int", PandasUDFType.SCALAR_ITER) def fi3(it): pid = TaskContext.get().partitionId() print("DBG: fi3: do init stuff, partitionId=" + str(pid)) for x, y in it: yield x + y * 10 + 100000 print("DBG: fi3: do close stuff, partitionId=" + str(pid)) @pandas_udf("int", PandasUDFType.SCALAR) def fp1(x): return x + 1000 @udf("int") def fu1(x): return x + 10 # test running sql udf/pandas udf/pandas iter udf at the same time. # Note this case the `fi1("a"), fi2("b"), fi3("a", "b")` will generate only one plan, # and `fu1("a")`, `fp1("a")` will generate another two separate plans. df.select(fu1("a"), fp1("a"), fi1("a"), fi2("b"), fi3("a", "b")).show() # test chain two pandas iter udf together # Note this case `fi2(fi1("a"))` will generate only one plan # Also note the init stuff/close stuff calling order will be like: # DBG: fi2: do init stuff, partitionId=0 # DBG: fi1: do init stuff, partitionId=0 # DBG: fi1: do close stuff, partitionId=0 # DBG: fi2: do close stuff, partitionId=0 df.select(fi2(fi1("a"))).show() # test more complex chain # Note this case `fi1("a"), fi2("a")` will generate one plan, # and `fi3(fi1_output, fi2_output)` will generate another plan df.select(fi3(fi1("a"), fi2("a"))).show() ``` ## How was this patch tested? To be added. Please review http://spark.apache.org/contributing.html before opening a pull request.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org