Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/19630#discussion_r150662476
--- Diff: python/pyspark/sql/functions.py ---
@@ -2049,132 +2051,13 @@ def map_values(col):
# ---------------------------- User Defined Function
----------------------------------
-def _wrap_function(sc, func, returnType):
- command = (func, returnType)
- pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command)
- return sc._jvm.PythonFunction(bytearray(pickled_command), env,
includes, sc.pythonExec,
- sc.pythonVer, broadcast_vars,
sc._javaAccumulator)
-
-
-class PythonUdfType(object):
- # row-at-a-time UDFs
- NORMAL_UDF = 0
- # scalar vectorized UDFs
- PANDAS_UDF = 1
- # grouped vectorized UDFs
- PANDAS_GROUPED_UDF = 2
-
-
-class UserDefinedFunction(object):
- """
- User defined function in Python
-
- .. versionadded:: 1.3
- """
- def __init__(self, func, returnType, name=None,
pythonUdfType=PythonUdfType.NORMAL_UDF):
- if not callable(func):
- raise TypeError(
- "Not a function or callable (__call__ is not defined): "
- "{0}".format(type(func)))
-
- self.func = func
- self._returnType = returnType
- # Stores UserDefinedPythonFunctions jobj, once initialized
- self._returnType_placeholder = None
- self._judf_placeholder = None
- self._name = name or (
- func.__name__ if hasattr(func, '__name__')
- else func.__class__.__name__)
- self.pythonUdfType = pythonUdfType
-
- @property
- def returnType(self):
- # This makes sure this is called after SparkContext is initialized.
- # ``_parse_datatype_string`` accesses to JVM for parsing a DDL
formatted string.
- if self._returnType_placeholder is None:
- if isinstance(self._returnType, DataType):
- self._returnType_placeholder = self._returnType
- else:
- self._returnType_placeholder =
_parse_datatype_string(self._returnType)
- return self._returnType_placeholder
-
- @property
- def _judf(self):
- # It is possible that concurrent access, to newly created UDF,
- # will initialize multiple UserDefinedPythonFunctions.
- # This is unlikely, doesn't affect correctness,
- # and should have a minimal performance impact.
- if self._judf_placeholder is None:
- self._judf_placeholder = self._create_judf()
- return self._judf_placeholder
-
- def _create_judf(self):
- from pyspark.sql import SparkSession
-
- spark = SparkSession.builder.getOrCreate()
- sc = spark.sparkContext
-
- wrapped_func = _wrap_function(sc, self.func, self.returnType)
- jdt = spark._jsparkSession.parseDataType(self.returnType.json())
- judf =
sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
- self._name, wrapped_func, jdt, self.pythonUdfType)
- return judf
-
- def __call__(self, *cols):
- judf = self._judf
- sc = SparkContext._active_spark_context
- return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
-
- def _wrapped(self):
- """
- Wrap this udf with a function and attach docstring from func
- """
-
- # It is possible for a callable instance without __name__
attribute or/and
- # __module__ attribute to be wrapped here. For example,
functools.partial. In this case,
- # we should avoid wrapping the attributes from the wrapped
function to the wrapper
- # function. So, we take out these attribute names from the default
names to set and
- # then manually assign it after being wrapped.
- assignments = tuple(
- a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__'
and a != '__module__')
-
- @functools.wraps(self.func, assigned=assignments)
- def wrapper(*args):
- return self(*args)
-
- wrapper.__name__ = self._name
- wrapper.__module__ = (self.func.__module__ if hasattr(self.func,
'__module__')
- else self.func.__class__.__module__)
-
- wrapper.func = self.func
- wrapper.returnType = self.returnType
- wrapper.pythonUdfType = self.pythonUdfType
-
- return wrapper
-
-
-def _create_udf(f, returnType, pythonUdfType):
-
- def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
- if pythonUdfType == PythonUdfType.PANDAS_UDF:
- import inspect
- argspec = inspect.getargspec(f)
- if len(argspec.args) == 0 and argspec.varargs is None:
- raise ValueError(
- "0-arg pandas_udfs are not supported. "
- "Instead, create a 1-arg pandas_udf and ignore the arg
in your function."
- )
- udf_obj = UserDefinedFunction(f, returnType,
pythonUdfType=pythonUdfType)
- return udf_obj._wrapped()
-
- # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
- if f is None or isinstance(f, (str, DataType)):
- # If DataType has been passed as a positional argument
- # for decorator use it as a returnType
- return_type = f or returnType
- return functools.partial(_udf, returnType=return_type,
pythonUdfType=pythonUdfType)
- else:
- return _udf(f=f, returnType=returnType,
pythonUdfType=pythonUdfType)
+class PandasUDFType(enum.Enum):
--- End diff --
This is in python built-in library after python3.4. It's not built-in for
python2.x, but available through "enum34" package.
There are other ways to implement enum without enum34, but all of them are
either complicated (using type()) or not as type-safe (using ints).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]