This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8152a87  [SPARK-28978][ ] Support > 256 args to python udf
8152a87 is described below

commit 8152a87235a63a13969f7c1ff5ed038956e8ed76
Author: Bago Amirbekian <b...@databricks.com>
AuthorDate: Fri Nov 8 19:19:14 2019 -0800

    [SPARK-28978][ ] Support > 256 args to python udf
    
    ### What changes were proposed in this pull request?
    
    On the worker we express lambda functions as strings and then eval them to 
create a "mapper" function. This make the code hard to read & limits the # of 
arguments a udf can support to 256 for python <= 3.6.
    
    This PR rewrites the mapper functions as nested functions instead of 
"lambda strings" and allows passing in more than 255 args.
    
    ### Why are the changes needed?
    The jira ticket associated with this issue describes how MLflow uses udfs 
to consume columns as features. This pattern isn't unique and a limit of 255 
features is quite low.
    
    ### Does this PR introduce any user-facing change?
    Users can now pass more than 255 cols to a udf function.
    
    ### How was this patch tested?
    Added a unit test for passing in > 255 args to udf.
    
    Closes #26442 from MrBago/replace-lambdas-on-worker.
    
    Authored-by: Bago Amirbekian <b...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 python/pyspark/sql/tests/test_udf.py | 13 ++++++++
 python/pyspark/worker.py             | 62 +++++++++++++++++-------------------
 2 files changed, 42 insertions(+), 33 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udf.py 
b/python/pyspark/sql/tests/test_udf.py
index c274dc7..3b9f12f 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -629,6 +629,19 @@ class UDFTests(ReusedSQLTestCase):
 
         self.sc.parallelize(range(1), 1).mapPartitions(task).count()
 
+    def test_udf_with_256_args(self):
+        N = 256
+        data = [["data-%d" % i for i in range(N)]] * 5
+        df = self.spark.createDataFrame(data)
+
+        def f(*a):
+            return "success"
+
+        fUdf = udf(f, StringType())
+
+        r = df.select(fUdf(*df.columns))
+        self.assertEqual(r.first()[0], "success")
+
 
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 3a1200e..bfa8d97 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -403,54 +403,50 @@ def read_udfs(pickleSer, infile, eval_type):
             idx += offsets_len
         return parsed
 
-    udfs = {}
-    call_udf = []
-    mapper_str = ""
     if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
-        # Create function like this:
-        #   lambda a: f([a[0]], [a[0], a[1]])
-
         # We assume there is only one UDF here because grouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1
 
         # See FlatMapGroupsInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, udf = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0)
-        udfs['f'] = udf
+        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
         parsed_offsets = extract_key_value_indexes(arg_offsets)
-        keys = ["a[%d]" % (o,) for o in parsed_offsets[0][0]]
-        vals = ["a[%d]" % (o, ) for o in parsed_offsets[0][1]]
-        mapper_str = "lambda a: f([%s], [%s])" % (", ".join(keys), ", 
".join(vals))
+
+        # Create function like this:
+        #   mapper a: f([a[0]], [a[0], a[1]])
+        def mapper(a):
+            keys = [a[o] for o in parsed_offsets[0][0]]
+            vals = [a[o] for o in parsed_offsets[0][1]]
+            return f(keys, vals)
     elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
         # We assume there is only one UDF here because cogrouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1
-        arg_offsets, udf = read_single_udf(
-            pickleSer, infile, eval_type, runner_conf, udf_index=0)
-        udfs['f'] = udf
+        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=0)
+
         parsed_offsets = extract_key_value_indexes(arg_offsets)
-        df1_keys = ["a[0][%d]" % (o, ) for o in parsed_offsets[0][0]]
-        df1_vals = ["a[0][%d]" % (o, ) for o in parsed_offsets[0][1]]
-        df2_keys = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][0]]
-        df2_vals = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][1]]
-        mapper_str = "lambda a: f([%s], [%s], [%s], [%s])" % (
-            ", ".join(df1_keys), ", ".join(df1_vals), ", ".join(df2_keys), ", 
".join(df2_vals))
+
+        def mapper(a):
+            df1_keys = [a[0][o] for o in parsed_offsets[0][0]]
+            df1_vals = [a[0][o] for o in parsed_offsets[0][1]]
+            df2_keys = [a[1][o] for o in parsed_offsets[1][0]]
+            df2_vals = [a[1][o] for o in parsed_offsets[1][1]]
+            return f(df1_keys, df1_vals, df2_keys, df2_vals)
     else:
-        # Create function like this:
-        #   lambda a: (f0(a[0]), f1(a[1], a[2]), f2(a[3]))
-        # In the special case of a single UDF this will return a single result 
rather
-        # than a tuple of results; this is the format that the JVM side 
expects.
+        udfs = []
         for i in range(num_udfs):
-            arg_offsets, udf = read_single_udf(
-                pickleSer, infile, eval_type, runner_conf, udf_index=i)
-            udfs['f%d' % i] = udf
-            args = ["a[%d]" % o for o in arg_offsets]
-            call_udf.append("f%d(%s)" % (i, ", ".join(args)))
-        mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
-
-    mapper = eval(mapper_str, udfs)
+            udfs.append(read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index=i))
+
+        def mapper(a):
+            result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, 
f) in udfs)
+            # In the special case of a single UDF this will return a single 
result rather
+            # than a tuple of results; this is the format that the JVM side 
expects.
+            if len(result) == 1:
+                return result[0]
+            else:
+                return result
+
     func = lambda _, it: map(mapper, it)
 
     # profiling is not supported for UDF


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to