Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172249841
  
    --- Diff: python/pyspark/worker.py ---
    @@ -149,18 +156,30 @@ def read_udfs(pickleSer, infile, eval_type):
         num_udfs = read_int(infile)
         udfs = {}
         call_udf = []
    -    for i in range(num_udfs):
    +    mapper_str = ""
    +    if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    +        # Create function like this:
    +        #   lambda a: f([a[0]], [a[0], a[1]])
    +        assert num_udfs == 1
             arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
    -        udfs['f%d' % i] = udf
    -        args = ["a[%d]" % o for o in arg_offsets]
    -        call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    -    # Create function like this:
    -    #   lambda a: (f0(a0), f1(a1, a2), f2(a3))
    -    # In the special case of a single UDF this will return a single result 
rather
    -    # than a tuple of results; this is the format that the JVM side 
expects.
    -    mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    -    mapper = eval(mapper_str, udfs)
    +        udfs['f'] = udf
    +        split_offset = arg_offsets[0] + 1
    +        arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
    +        arg1 = ["a[%d]" % o for o in arg_offsets[split_offset:]]
    +        mapper_str = ("lambda a: f([%s], [%s])" % (", ".join(arg0), ", 
".join(arg1)))
    +    else:
    +        # Create function like this:
    +        #   lambda a: (f0(a[0]), f1(a[1], a[2]), f2(a[3]))
    +        # In the special case of a single UDF this will return a single 
result rather
    +        # than a tuple of results; this is the format that the JVM side 
expects.
    +        for i in range(num_udfs):
    +            arg_offsets, udf = read_single_udf(pickleSer, infile, 
eval_type)
    +            udfs['f%d' % i] = udf
    +            args = ["a[%d]" % o for o in arg_offsets]
    +            call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    +            mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to