asfgit closed pull request #23314: [SPARK-26364][PYTHON][TESTING] Clean up
imports in test_pandas_udf*
URL: https://github.com/apache/spark/pull/23314
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/python/pyspark/sql/tests/test_pandas_udf.py
b/python/pyspark/sql/tests/test_pandas_udf.py
index c4b5478a7e893..d4d9679649ee9 100644
--- a/python/pyspark/sql/tests/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/test_pandas_udf.py
@@ -17,12 +17,16 @@
import unittest
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.utils import ParseException
+from pyspark.rdd import PythonEvalType
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas,
have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
from pyspark.testing.utils import QuietTest
+from py4j.protocol import Py4JJavaError
+
@unittest.skipIf(
not have_pandas or not have_pyarrow,
@@ -30,9 +34,6 @@
class PandasUDFTests(ReusedSQLTestCase):
def test_pandas_udf_basic(self):
- from pyspark.rdd import PythonEvalType
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
udf = pandas_udf(lambda x: x, DoubleType())
self.assertEqual(udf.returnType, DoubleType())
self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
@@ -65,10 +66,6 @@ def test_pandas_udf_basic(self):
self.assertEqual(udf.evalType,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
def test_pandas_udf_decorator(self):
- from pyspark.rdd import PythonEvalType
- from pyspark.sql.functions import pandas_udf, PandasUDFType
- from pyspark.sql.types import StructType, StructField, DoubleType
-
@pandas_udf(DoubleType())
def foo(x):
return x
@@ -114,8 +111,6 @@ def foo(x):
self.assertEqual(foo.evalType,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
def test_udf_wrong_arg(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
with QuietTest(self.sc):
with self.assertRaises(ParseException):
@pandas_udf('blah')
@@ -151,9 +146,6 @@ def foo(k, v, w):
return k
def test_stopiteration_in_udf(self):
- from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
- from py4j.protocol import Py4JJavaError
-
def foo(x):
raise StopIteration()
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
index 5383704434c85..18264ead2fd08 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
@@ -17,6 +17,9 @@
import unittest
+from pyspark.rdd import PythonEvalType
+from pyspark.sql.functions import array, explode, col, lit, mean, sum, \
+ udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas,
have_pyarrow, \
@@ -31,7 +34,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
@property
def data(self):
- from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i * 1.0) + col('id') for i in
range(20, 30)])) \
.withColumn("v", explode(col('vs'))) \
@@ -40,8 +42,6 @@ def data(self):
@property
def python_plus_one(self):
- from pyspark.sql.functions import udf
-
@udf('double')
def plus_one(v):
assert isinstance(v, (int, float))
@@ -51,7 +51,6 @@ def plus_one(v):
@property
def pandas_scalar_plus_two(self):
import pandas as pd
- from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.SCALAR)
def plus_two(v):
@@ -61,8 +60,6 @@ def plus_two(v):
@property
def pandas_agg_mean_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
return v.mean()
@@ -70,8 +67,6 @@ def avg(v):
@property
def pandas_agg_sum_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def sum(v):
return v.sum()
@@ -80,7 +75,6 @@ def sum(v):
@property
def pandas_agg_weighted_mean_udf(self):
import numpy as np
- from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def weighted_mean(v, w):
@@ -88,8 +82,6 @@ def weighted_mean(v, w):
return weighted_mean
def test_manual(self):
- from pyspark.sql.functions import pandas_udf, array
-
df = self.data
sum_udf = self.pandas_agg_sum_udf
mean_udf = self.pandas_agg_mean_udf
@@ -118,8 +110,6 @@ def test_manual(self):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_basic(self):
- from pyspark.sql.functions import col, lit, mean
-
df = self.data
weighted_mean_udf = self.pandas_agg_weighted_mean_udf
@@ -150,9 +140,6 @@ def test_basic(self):
self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
def test_unsupported_types(self):
- from pyspark.sql.types import DoubleType, MapType
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
with QuietTest(self.sc):
with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
pandas_udf(
@@ -173,8 +160,6 @@ def mean_and_std_udf(v):
return {v.mean(): v.std()}
def test_alias(self):
- from pyspark.sql.functions import mean
-
df = self.data
mean_udf = self.pandas_agg_mean_udf
@@ -187,8 +172,6 @@ def test_mixed_sql(self):
"""
Test mixing group aggregate pandas UDF with sql expression.
"""
- from pyspark.sql.functions import sum
-
df = self.data
sum_udf = self.pandas_agg_sum_udf
@@ -225,8 +208,6 @@ def test_mixed_udfs(self):
"""
Test mixing group aggregate pandas UDF with python UDF and scalar
pandas UDF.
"""
- from pyspark.sql.functions import sum
-
df = self.data
plus_one = self.python_plus_one
plus_two = self.pandas_scalar_plus_two
@@ -292,8 +273,6 @@ def test_multiple_udfs(self):
"""
Test multiple group aggregate pandas UDFs in one agg function.
"""
- from pyspark.sql.functions import sum, mean
-
df = self.data
mean_udf = self.pandas_agg_mean_udf
sum_udf = self.pandas_agg_sum_udf
@@ -315,8 +294,6 @@ def test_multiple_udfs(self):
self.assertPandasEqual(expected1, result1)
def test_complex_groupby(self):
- from pyspark.sql.functions import sum
-
df = self.data
sum_udf = self.pandas_agg_sum_udf
plus_one = self.python_plus_one
@@ -359,8 +336,6 @@ def test_complex_groupby(self):
self.assertPandasEqual(expected7.toPandas(), result7.toPandas())
def test_complex_expressions(self):
- from pyspark.sql.functions import col, sum
-
df = self.data
plus_one = self.python_plus_one
plus_two = self.pandas_scalar_plus_two
@@ -434,7 +409,6 @@ def test_complex_expressions(self):
self.assertPandasEqual(expected3, result3)
def test_retain_group_columns(self):
- from pyspark.sql.functions import sum
with self.sql_conf({"spark.sql.retainGroupColumns": False}):
df = self.data
sum_udf = self.pandas_agg_sum_udf
@@ -444,8 +418,6 @@ def test_retain_group_columns(self):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_array_type(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
df = self.data
array_udf = pandas_udf(lambda x: [1.0, 2.0], 'array<double>',
PandasUDFType.GROUPED_AGG)
@@ -453,8 +425,6 @@ def test_array_type(self):
self.assertEquals(result1.first()['v2'], [1.0, 2.0])
def test_invalid_args(self):
- from pyspark.sql.functions import mean
-
df = self.data
plus_one = self.python_plus_one
mean_udf = self.pandas_agg_mean_udf
@@ -478,9 +448,6 @@ def test_invalid_args(self):
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()
def test_register_vectorized_udf_basic(self):
- from pyspark.sql.functions import pandas_udf
- from pyspark.rdd import PythonEvalType
-
sum_pandas_udf = pandas_udf(
lambda v: v.sum(), "integer",
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF)
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index a12c608dff9dd..80e70349b78d3 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -18,7 +18,12 @@
import datetime
import unittest
+from collections import OrderedDict
+from decimal import Decimal
+from distutils.version import LooseVersion
+
from pyspark.sql import Row
+from pyspark.sql.functions import array, explode, col, lit, udf, sum,
pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas,
have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
@@ -32,16 +37,12 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
@property
def data(self):
- from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))).drop('vs')
def test_supported_types(self):
- from decimal import Decimal
- from distutils.version import LooseVersion
import pyarrow as pa
- from pyspark.sql.functions import pandas_udf, PandasUDFType
values = [
1, 2, 3,
@@ -131,8 +132,6 @@ def test_supported_types(self):
self.assertPandasEqual(expected3, result3)
def test_array_type_correct(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col
-
df = self.data.withColumn("arr", array(col("id"))).repartition(1, "id")
output_schema = StructType(
@@ -151,8 +150,6 @@ def test_array_type_correct(self):
self.assertPandasEqual(expected, result)
def test_register_grouped_map_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
with QuietTest(self.sc):
with self.assertRaisesRegexp(
@@ -161,7 +158,6 @@ def test_register_grouped_map_udf(self):
self.spark.catalog.registerFunction("foo_udf", foo_udf)
def test_decorator(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
@pandas_udf(
@@ -176,7 +172,6 @@ def foo(pdf):
self.assertPandasEqual(expected, result)
def test_coerce(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
foo = pandas_udf(
@@ -191,7 +186,6 @@ def test_coerce(self):
self.assertPandasEqual(expected, result)
def test_complex_groupby(self):
- from pyspark.sql.functions import pandas_udf, col, PandasUDFType
df = self.data
@pandas_udf(
@@ -210,7 +204,6 @@ def normalize(pdf):
self.assertPandasEqual(expected, result)
def test_empty_groupby(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
@pandas_udf(
@@ -229,7 +222,6 @@ def normalize(pdf):
self.assertPandasEqual(expected, result)
def test_datatype_string(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
foo_udf = pandas_udf(
@@ -243,8 +235,6 @@ def test_datatype_string(self):
self.assertPandasEqual(expected, result)
def test_wrong_return_type(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
@@ -255,7 +245,6 @@ def test_wrong_return_type(self):
PandasUDFType.GROUPED_MAP)
def test_wrong_args(self):
- from pyspark.sql.functions import udf, pandas_udf, sum, PandasUDFType
df = self.data
with QuietTest(self.sc):
@@ -277,9 +266,7 @@ def test_wrong_args(self):
pandas_udf(lambda x, y: x, DoubleType(),
PandasUDFType.SCALAR))
def test_unsupported_types(self):
- from distutils.version import LooseVersion
import pyarrow as pa
- from pyspark.sql.functions import pandas_udf, PandasUDFType
common_err_msg = 'Invalid returnType.*grouped map Pandas UDF.*'
unsupported_types = [
@@ -300,7 +287,6 @@ def test_unsupported_types(self):
# Regression test for SPARK-23314
def test_timestamp_dst(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00
am
dt = [datetime.datetime(2015, 11, 1, 0, 30),
datetime.datetime(2015, 11, 1, 1, 30),
@@ -311,12 +297,12 @@ def test_timestamp_dst(self):
self.assertPandasEqual(df.toPandas(), result.toPandas())
def test_udf_with_key(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
+ import numpy as np
+
df = self.data
pdf = df.toPandas()
def foo1(key, pdf):
- import numpy as np
assert type(key) == tuple
assert type(key[0]) == np.int64
@@ -326,7 +312,6 @@ def foo1(key, pdf):
v4=pdf.v * pdf.id.mean())
def foo2(key, pdf):
- import numpy as np
assert type(key) == tuple
assert type(key[0]) == np.int64
assert type(key[1]) == np.int32
@@ -385,9 +370,7 @@ def foo3(key, pdf):
self.assertPandasEqual(expected4, result4)
def test_column_order(self):
- from collections import OrderedDict
import pandas as pd
- from pyspark.sql.functions import pandas_udf, PandasUDFType
# Helper function to set column names from a list
def rename_pdf(pdf, names):
@@ -468,7 +451,6 @@ def invalid_positional_types(pdf):
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, "KeyError: 'id'"):
grouped_df.apply(column_name_typo).collect()
- from distutils.version import LooseVersion
import pyarrow as pa
if LooseVersion(pa.__version__) < LooseVersion("0.11.0"):
# TODO: see ARROW-1949. Remove when the minimum PyArrow
version becomes 0.11.0.
@@ -480,7 +462,6 @@ def invalid_positional_types(pdf):
def test_positional_assignment_conf(self):
import pandas as pd
- from pyspark.sql.functions import pandas_udf, PandasUDFType
with self.sql_conf({
"spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName": False}):
@@ -496,9 +477,7 @@ def foo(_):
self.assertEqual(r.b, 1)
def test_self_join_with_pandas(self):
- import pyspark.sql.functions as F
-
- @F.pandas_udf('key long, col string', F.PandasUDFType.GROUPED_MAP)
+ @pandas_udf('key long, col string', PandasUDFType.GROUPED_MAP)
def dummy_pandas_udf(df):
return df[['key', 'col']]
@@ -508,12 +487,11 @@ def dummy_pandas_udf(df):
# this was throwing an AnalysisException before SPARK-24208
res = df_with_pandas.alias('temp0').join(df_with_pandas.alias('temp1'),
- F.col('temp0.key') ==
F.col('temp1.key'))
+ col('temp0.key') ==
col('temp1.key'))
self.assertEquals(res.count(), 5)
def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
import pandas as pd
- from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
df = self.spark.range(0, 10).toDF('v1')
df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1'])) \
diff --git a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
index 2f585a3725988..6a6865a9fb16d 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_scalar.py
@@ -16,12 +16,20 @@
#
import datetime
import os
+import random
import shutil
import sys
import tempfile
import time
import unittest
+from datetime import date, datetime
+from decimal import Decimal
+from distutils.version import LooseVersion
+
+from pyspark.rdd import PythonEvalType
+from pyspark.sql import Column
+from pyspark.sql.functions import array, col, expr, lit, sum, udf, pandas_udf
from pyspark.sql.types import Row
from pyspark.sql.types import *
from pyspark.sql.utils import AnalysisException
@@ -59,18 +67,16 @@ def tearDownClass(cls):
@property
def nondeterministic_vectorized_udf(self):
- from pyspark.sql.functions import pandas_udf
+ import pandas as pd
+ import numpy as np
@pandas_udf('double')
def random_udf(v):
- import pandas as pd
- import numpy as np
return pd.Series(np.random.random(len(v)))
random_udf = random_udf.asNondeterministic()
return random_udf
def test_pandas_udf_tokenize(self):
- from pyspark.sql.functions import pandas_udf
tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')),
ArrayType(StringType()))
self.assertEqual(tokenize.returnType, ArrayType(StringType()))
@@ -79,7 +85,6 @@ def test_pandas_udf_tokenize(self):
self.assertEqual([Row(hi=[u'hi', u'boo']), Row(hi=[u'bye', u'boo'])],
result.collect())
def test_pandas_udf_nested_arrays(self):
- from pyspark.sql.functions import pandas_udf
tokenize = pandas_udf(lambda s: s.apply(lambda str: [str.split(' ')]),
ArrayType(ArrayType(StringType())))
self.assertEqual(tokenize.returnType,
ArrayType(ArrayType(StringType())))
@@ -88,7 +93,6 @@ def test_pandas_udf_nested_arrays(self):
self.assertEqual([Row(hi=[[u'hi', u'boo']]), Row(hi=[[u'bye',
u'boo']])], result.collect())
def test_vectorized_udf_basic(self):
- from pyspark.sql.functions import pandas_udf, col, array
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
@@ -114,9 +118,6 @@ def test_vectorized_udf_basic(self):
self.assertEquals(df.collect(), res.collect())
def test_register_nondeterministic_vectorized_udf_basic(self):
- from pyspark.sql.functions import pandas_udf
- from pyspark.rdd import PythonEvalType
- import random
random_pandas_udf = pandas_udf(
lambda x: random.randint(6, 6) + x,
IntegerType()).asNondeterministic()
self.assertEqual(random_pandas_udf.deterministic, False)
@@ -129,7 +130,6 @@ def
test_register_nondeterministic_vectorized_udf_basic(self):
self.assertEqual(row[0], 7)
def test_vectorized_udf_null_boolean(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(True,), (True,), (None,), (False,)]
schema = StructType().add("bool", BooleanType())
df = self.spark.createDataFrame(data, schema)
@@ -138,7 +138,6 @@ def test_vectorized_udf_null_boolean(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_byte(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("byte", ByteType())
df = self.spark.createDataFrame(data, schema)
@@ -147,7 +146,6 @@ def test_vectorized_udf_null_byte(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_short(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("short", ShortType())
df = self.spark.createDataFrame(data, schema)
@@ -156,7 +154,6 @@ def test_vectorized_udf_null_short(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_int(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("int", IntegerType())
df = self.spark.createDataFrame(data, schema)
@@ -165,7 +162,6 @@ def test_vectorized_udf_null_int(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_long(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(None,), (2,), (3,), (4,)]
schema = StructType().add("long", LongType())
df = self.spark.createDataFrame(data, schema)
@@ -174,7 +170,6 @@ def test_vectorized_udf_null_long(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_float(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("float", FloatType())
df = self.spark.createDataFrame(data, schema)
@@ -183,7 +178,6 @@ def test_vectorized_udf_null_float(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_double(self):
- from pyspark.sql.functions import pandas_udf, col
data = [(3.0,), (5.0,), (-1.0,), (None,)]
schema = StructType().add("double", DoubleType())
df = self.spark.createDataFrame(data, schema)
@@ -192,8 +186,6 @@ def test_vectorized_udf_null_double(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_decimal(self):
- from decimal import Decimal
- from pyspark.sql.functions import pandas_udf, col
data = [(Decimal(3.0),), (Decimal(5.0),), (Decimal(-1.0),), (None,)]
schema = StructType().add("decimal", DecimalType(38, 18))
df = self.spark.createDataFrame(data, schema)
@@ -202,7 +194,6 @@ def test_vectorized_udf_null_decimal(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_string(self):
- from pyspark.sql.functions import pandas_udf, col
data = [("foo",), (None,), ("bar",), ("bar",)]
schema = StructType().add("str", StringType())
df = self.spark.createDataFrame(data, schema)
@@ -211,7 +202,6 @@ def test_vectorized_udf_null_string(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_string_in_udf(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = self.spark.range(10)
str_f = pandas_udf(lambda x: pd.Series(map(str, x)), StringType())
@@ -220,7 +210,6 @@ def test_vectorized_udf_string_in_udf(self):
self.assertEquals(expected.collect(), actual.collect())
def test_vectorized_udf_datatype_string(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
@@ -244,9 +233,8 @@ def test_vectorized_udf_datatype_string(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_null_binary(self):
- from distutils.version import LooseVersion
import pyarrow as pa
- from pyspark.sql.functions import pandas_udf, col
+
if LooseVersion(pa.__version__) < LooseVersion("0.10.0"):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
@@ -262,7 +250,6 @@ def test_vectorized_udf_null_binary(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_array_type(self):
- from pyspark.sql.functions import pandas_udf, col
data = [([1, 2],), ([3, 4],)]
array_schema = StructType([StructField("array",
ArrayType(IntegerType()))])
df = self.spark.createDataFrame(data, schema=array_schema)
@@ -271,7 +258,6 @@ def test_vectorized_udf_array_type(self):
self.assertEquals(df.collect(), result.collect())
def test_vectorized_udf_null_array(self):
- from pyspark.sql.functions import pandas_udf, col
data = [([1, 2],), (None,), (None,), ([3, 4],), (None,)]
array_schema = StructType([StructField("array",
ArrayType(IntegerType()))])
df = self.spark.createDataFrame(data, schema=array_schema)
@@ -280,7 +266,6 @@ def test_vectorized_udf_null_array(self):
self.assertEquals(df.collect(), result.collect())
def test_vectorized_udf_complex(self):
- from pyspark.sql.functions import pandas_udf, col, expr
df = self.spark.range(10).select(
col('id').cast('int').alias('a'),
col('id').cast('int').alias('b'),
@@ -293,7 +278,6 @@ def test_vectorized_udf_complex(self):
self.assertEquals(expected.collect(), res.collect())
def test_vectorized_udf_exception(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
raise_exception = pandas_udf(lambda x: x * (1 / 0), LongType())
with QuietTest(self.sc):
@@ -301,8 +285,8 @@ def test_vectorized_udf_exception(self):
df.select(raise_exception(col('id'))).collect()
def test_vectorized_udf_invalid_length(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
+
df = self.spark.range(10)
raise_exception = pandas_udf(lambda _: pd.Series(1), LongType())
with QuietTest(self.sc):
@@ -312,7 +296,6 @@ def test_vectorized_udf_invalid_length(self):
df.select(raise_exception(col('id'))).collect()
def test_vectorized_udf_chained(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
f = pandas_udf(lambda x: x + 1, LongType())
g = pandas_udf(lambda x: x - 1, LongType())
@@ -320,7 +303,6 @@ def test_vectorized_udf_chained(self):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_wrong_return_type(self):
- from pyspark.sql.functions import pandas_udf
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
@@ -328,7 +310,6 @@ def test_vectorized_udf_wrong_return_type(self):
pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
def test_vectorized_udf_return_scalar(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
f = pandas_udf(lambda x: 1.0, DoubleType())
with QuietTest(self.sc):
@@ -336,7 +317,6 @@ def test_vectorized_udf_return_scalar(self):
df.select(f(col('id'))).collect()
def test_vectorized_udf_decorator(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
@pandas_udf(returnType=LongType())
@@ -346,21 +326,18 @@ def identity(x):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_empty_partition(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
f = pandas_udf(lambda x: x, LongType())
res = df.select(f(col('id')))
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_varargs(self):
- from pyspark.sql.functions import pandas_udf, col
df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
f = pandas_udf(lambda *v: v[0], LongType())
res = df.select(f(col('id')))
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_unsupported_types(self):
- from pyspark.sql.functions import pandas_udf
with QuietTest(self.sc):
with self.assertRaisesRegexp(
NotImplementedError,
@@ -368,8 +345,6 @@ def test_vectorized_udf_unsupported_types(self):
pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
def test_vectorized_udf_dates(self):
- from pyspark.sql.functions import pandas_udf, col
- from datetime import date
schema = StructType().add("idx", LongType()).add("date", DateType())
data = [(0, date(1969, 1, 1),),
(1, date(2012, 2, 2),),
@@ -405,8 +380,6 @@ def check_data(idx, date, date_copy):
self.assertIsNone(result[i][3]) # "check_data" col
def test_vectorized_udf_timestamps(self):
- from pyspark.sql.functions import pandas_udf, col
- from datetime import datetime
schema = StructType([
StructField("idx", LongType(), True),
StructField("timestamp", TimestampType(), True)])
@@ -447,8 +420,8 @@ def check_data(idx, timestamp, timestamp_copy):
self.assertIsNone(result[i][3]) # "check_data" col
def test_vectorized_udf_return_timestamp_tz(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
+
df = self.spark.range(10)
@pandas_udf(returnType=TimestampType())
@@ -465,8 +438,8 @@ def gen_timestamps(id):
self.assertEquals(expected, ts)
def test_vectorized_udf_check_config(self):
- from pyspark.sql.functions import pandas_udf, col
import pandas as pd
+
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch":
3}):
df = self.spark.range(10, numPartitions=1)
@@ -479,9 +452,8 @@ def check_records_per_batch(x):
self.assertTrue(r <= 3)
def test_vectorized_udf_timestamps_respect_session_timezone(self):
- from pyspark.sql.functions import pandas_udf, col
- from datetime import datetime
import pandas as pd
+
schema = StructType([
StructField("idx", LongType(), True),
StructField("timestamp", TimestampType(), True)])
@@ -519,8 +491,6 @@ def
test_vectorized_udf_timestamps_respect_session_timezone(self):
def test_nondeterministic_vectorized_udf(self):
# Test that nondeterministic UDFs are evaluated only once in chained
UDF evaluations
- from pyspark.sql.functions import pandas_udf, col
-
@pandas_udf('double')
def plus_ten(v):
return v + 10
@@ -533,8 +503,6 @@ def plus_ten(v):
self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))
def test_nondeterministic_vectorized_udf_in_aggregate(self):
- from pyspark.sql.functions import sum
-
df = self.spark.range(10)
random_udf = self.nondeterministic_vectorized_udf
@@ -545,8 +513,6 @@ def test_nondeterministic_vectorized_udf_in_aggregate(self):
df.agg(sum(random_udf(df.id))).collect()
def test_register_vectorized_udf_basic(self):
- from pyspark.rdd import PythonEvalType
- from pyspark.sql.functions import pandas_udf, col, expr
df = self.spark.range(10).select(
col('id').cast('int').alias('a'),
col('id').cast('int').alias('b'))
@@ -563,11 +529,10 @@ def test_register_vectorized_udf_basic(self):
# Regression test for SPARK-23314
def test_timestamp_dst(self):
- from pyspark.sql.functions import pandas_udf
# Daylight saving time for Los Angeles for 2015 is Sun, Nov 1 at 2:00
am
- dt = [datetime.datetime(2015, 11, 1, 0, 30),
- datetime.datetime(2015, 11, 1, 1, 30),
- datetime.datetime(2015, 11, 1, 2, 30)]
+ dt = [datetime(2015, 11, 1, 0, 30),
+ datetime(2015, 11, 1, 1, 30),
+ datetime(2015, 11, 1, 2, 30)]
df = self.spark.createDataFrame(dt, 'timestamp').toDF('time')
foo_udf = pandas_udf(lambda x: x, 'timestamp')
result = df.withColumn('time', foo_udf(df.time))
@@ -593,7 +558,6 @@ def test_type_annotation(self):
def test_mixed_udf(self):
import pandas as pd
- from pyspark.sql.functions import col, udf, pandas_udf
df = self.spark.range(0, 1).toDF('v')
@@ -696,8 +660,6 @@ def f4(x):
def test_mixed_udf_and_sql(self):
import pandas as pd
- from pyspark.sql import Column
- from pyspark.sql.functions import udf, pandas_udf
df = self.spark.range(0, 1).toDF('v')
@@ -758,7 +720,6 @@ def test_datasource_with_udf(self):
# This needs to a separate test because Arrow dependency is optional
import pandas as pd
import numpy as np
- from pyspark.sql.functions import pandas_udf, lit, col
path = tempfile.mkdtemp()
shutil.rmtree(path)
diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py
b/python/pyspark/sql/tests/test_pandas_udf_window.py
index f0e6d2696df62..0a7a19c1c0814 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_window.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_window.py
@@ -18,6 +18,8 @@
import unittest
from pyspark.sql.utils import AnalysisException
+from pyspark.sql.functions import array, explode, col, lit, mean, min, max,
rank, \
+ udf, pandas_udf, PandasUDFType
from pyspark.sql.window import Window
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas,
have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
@@ -30,7 +32,6 @@
class WindowPandasUDFTests(ReusedSQLTestCase):
@property
def data(self):
- from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i * 1.0) + col('id') for i in
range(20, 30)])) \
.withColumn("v", explode(col('vs'))) \
@@ -39,18 +40,14 @@ def data(self):
@property
def python_plus_one(self):
- from pyspark.sql.functions import udf
return udf(lambda v: v + 1, 'double')
@property
def pandas_scalar_time_two(self):
- from pyspark.sql.functions import pandas_udf
return pandas_udf(lambda v: v * 2, 'double')
@property
def pandas_agg_mean_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
return v.mean()
@@ -58,8 +55,6 @@ def avg(v):
@property
def pandas_agg_max_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def max(v):
return v.max()
@@ -67,8 +62,6 @@ def max(v):
@property
def pandas_agg_min_udf(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def min(v):
return v.min()
@@ -88,8 +81,6 @@ def unpartitioned_window(self):
return Window.partitionBy()
def test_simple(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
@@ -105,8 +96,6 @@ def test_simple(self):
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
def test_multiple_udfs(self):
- from pyspark.sql.functions import max, min, mean
-
df = self.data
w = self.unbounded_window
@@ -121,8 +110,6 @@ def test_multiple_udfs(self):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_replace_existing(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
@@ -132,8 +119,6 @@ def test_replace_existing(self):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_mixed_sql(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
mean_udf = self.pandas_agg_mean_udf
@@ -144,8 +129,6 @@ def test_mixed_sql(self):
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
def test_mixed_udf(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unbounded_window
@@ -171,8 +154,6 @@ def test_mixed_udf(self):
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
def test_without_partitionBy(self):
- from pyspark.sql.functions import mean
-
df = self.data
w = self.unpartitioned_window
mean_udf = self.pandas_agg_mean_udf
@@ -187,8 +168,6 @@ def test_without_partitionBy(self):
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
def test_mixed_sql_and_udf(self):
- from pyspark.sql.functions import max, min, rank, col
-
df = self.data
w = self.unbounded_window
ow = self.ordered_window
@@ -221,8 +200,6 @@ def test_mixed_sql_and_udf(self):
self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
def test_array_type(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
df = self.data
w = self.unbounded_window
@@ -231,8 +208,6 @@ def test_array_type(self):
self.assertEquals(result1.first()['v2'], [1.0, 2.0])
def test_invalid_args(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
df = self.data
w = self.unbounded_window
ow = self.ordered_window
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]