This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8152a87 [SPARK-28978][ ] Support > 256 args to python udf 8152a87 is described below commit 8152a87235a63a13969f7c1ff5ed038956e8ed76 Author: Bago Amirbekian <b...@databricks.com> AuthorDate: Fri Nov 8 19:19:14 2019 -0800 [SPARK-28978][ ] Support > 256 args to python udf ### What changes were proposed in this pull request? On the worker we express lambda functions as strings and then eval them to create a "mapper" function. This make the code hard to read & limits the # of arguments a udf can support to 256 for python <= 3.6. This PR rewrites the mapper functions as nested functions instead of "lambda strings" and allows passing in more than 255 args. ### Why are the changes needed? The jira ticket associated with this issue describes how MLflow uses udfs to consume columns as features. This pattern isn't unique and a limit of 255 features is quite low. ### Does this PR introduce any user-facing change? Users can now pass more than 255 cols to a udf function. ### How was this patch tested? Added a unit test for passing in > 255 args to udf. Closes #26442 from MrBago/replace-lambdas-on-worker. Authored-by: Bago Amirbekian <b...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- python/pyspark/sql/tests/test_udf.py | 13 ++++++++ python/pyspark/worker.py | 62 +++++++++++++++++------------------- 2 files changed, 42 insertions(+), 33 deletions(-) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index c274dc7..3b9f12f 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -629,6 +629,19 @@ class UDFTests(ReusedSQLTestCase): self.sc.parallelize(range(1), 1).mapPartitions(task).count() + def test_udf_with_256_args(self): + N = 256 + data = [["data-%d" % i for i in range(N)]] * 5 + df = self.spark.createDataFrame(data) + + def f(*a): + return "success" + + fUdf = udf(f, StringType()) + + r = df.select(fUdf(*df.columns)) + self.assertEqual(r.first()[0], "success") + class UDFInitializationTests(unittest.TestCase): def tearDown(self): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 3a1200e..bfa8d97 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -403,54 +403,50 @@ def read_udfs(pickleSer, infile, eval_type): idx += offsets_len return parsed - udfs = {} - call_udf = [] - mapper_str = "" if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: - # Create function like this: - # lambda a: f([a[0]], [a[0], a[1]]) - # We assume there is only one UDF here because grouped map doesn't # support combining multiple UDFs. assert num_udfs == 1 # See FlatMapGroupsInPandasExec for how arg_offsets are used to # distinguish between grouping attributes and data attributes - arg_offsets, udf = read_single_udf( - pickleSer, infile, eval_type, runner_conf, udf_index=0) - udfs['f'] = udf + arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) parsed_offsets = extract_key_value_indexes(arg_offsets) - keys = ["a[%d]" % (o,) for o in parsed_offsets[0][0]] - vals = ["a[%d]" % (o, ) for o in parsed_offsets[0][1]] - mapper_str = "lambda a: f([%s], [%s])" % (", ".join(keys), ", ".join(vals)) + + # Create function like this: + # mapper a: f([a[0]], [a[0], a[1]]) + def mapper(a): + keys = [a[o] for o in parsed_offsets[0][0]] + vals = [a[o] for o in parsed_offsets[0][1]] + return f(keys, vals) elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF: # We assume there is only one UDF here because cogrouped map doesn't # support combining multiple UDFs. assert num_udfs == 1 - arg_offsets, udf = read_single_udf( - pickleSer, infile, eval_type, runner_conf, udf_index=0) - udfs['f'] = udf + arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0) + parsed_offsets = extract_key_value_indexes(arg_offsets) - df1_keys = ["a[0][%d]" % (o, ) for o in parsed_offsets[0][0]] - df1_vals = ["a[0][%d]" % (o, ) for o in parsed_offsets[0][1]] - df2_keys = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][0]] - df2_vals = ["a[1][%d]" % (o, ) for o in parsed_offsets[1][1]] - mapper_str = "lambda a: f([%s], [%s], [%s], [%s])" % ( - ", ".join(df1_keys), ", ".join(df1_vals), ", ".join(df2_keys), ", ".join(df2_vals)) + + def mapper(a): + df1_keys = [a[0][o] for o in parsed_offsets[0][0]] + df1_vals = [a[0][o] for o in parsed_offsets[0][1]] + df2_keys = [a[1][o] for o in parsed_offsets[1][0]] + df2_vals = [a[1][o] for o in parsed_offsets[1][1]] + return f(df1_keys, df1_vals, df2_keys, df2_vals) else: - # Create function like this: - # lambda a: (f0(a[0]), f1(a[1], a[2]), f2(a[3])) - # In the special case of a single UDF this will return a single result rather - # than a tuple of results; this is the format that the JVM side expects. + udfs = [] for i in range(num_udfs): - arg_offsets, udf = read_single_udf( - pickleSer, infile, eval_type, runner_conf, udf_index=i) - udfs['f%d' % i] = udf - args = ["a[%d]" % o for o in arg_offsets] - call_udf.append("f%d(%s)" % (i, ", ".join(args))) - mapper_str = "lambda a: (%s)" % (", ".join(call_udf)) - - mapper = eval(mapper_str, udfs) + udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i)) + + def mapper(a): + result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs) + # In the special case of a single UDF this will return a single result rather + # than a tuple of results; this is the format that the JVM side expects. + if len(result) == 1: + return result[0] + else: + return result + func = lambda _, it: map(mapper, it) # profiling is not supported for UDF --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org