Nicholas Chammas created SPARK-18589:
----------------------------------------

             Summary: persist() resolves "java.lang.RuntimeException: Invalid 
PythonUDF <lambda>(...), requires attributes from more than one child"
                 Key: SPARK-18589
                 URL: https://issues.apache.org/jira/browse/SPARK-18589
             Project: Spark
          Issue Type: Bug
          Components: PySpark, SQL
    Affects Versions: 2.0.2, 2.1.0
         Environment: Python 3.5, Java 8
            Reporter: Nicholas Chammas
            Priority: Minor


Smells like another optimizer bug that's similar to SPARK-17100 and 
SPARK-18254. I'm seeing this on 2.0.2 and on master at commit 
{{fb07bbe575aabe68422fd3a31865101fb7fa1722}}.

I don't have a minimal repro for this yet, but the error I'm seeing is:

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o247.count.
: java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires attributes 
from more than one child.
    at scala.sys.package$.error(package.scala:27)
    at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150)
    at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149)
    at scala.collection.immutable.Stream.foreach(Stream.scala:594)
    at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149)
    at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
    at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
    at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
    at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
    at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
    at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
    at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
    at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at 
org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93)
    at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
    at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
    at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
{code}

The extended plan (cleaned of field names) is as follows:

{code}
== Parsed Logical Plan ==
'Filter NOT ('expected_prediction = 'prediction)
+- Project [p1, p2, pair_features, rawPrediction, probability, prediction, 
cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS 
expected_prediction]
   +- Project [p1, p2, pair_features, rawPrediction, probability, 
UDF(rawPrediction) AS prediction]
      +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS 
probability]
         +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
            +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
               +- Project [struct(...) AS p1, struct(...) AS p2]
                  +- Project [_blocking_key, ..., ...]
                     +- Join Inner, (_blocking_key = _blocking_key)
                        :- SubqueryAlias p1
                        :  +- Project [..., <lambda>(dataset_name, primary_key, 
person) AS _blocking_key]
                        :     +- Project [...]
                        :        +- Project [primary_key, universal_key, 
_testing_universal_key, struct(...) AS person]
                        :           +- Project [...]
                        :              +- Project [_testing_universal_key, 
primary_key, struct(...) AS person]
                        :                 +- LogicalRDD [...]
                        +- SubqueryAlias p2
                           +- Project [..., <lambda>(dataset_name, primary_key, 
person) AS _blocking_key]
                              +- Project [...]
                                 +- Project [primary_key, universal_key, 
_testing_universal_key, struct(...) AS person]
                                    +- Project [...]
                                       +- Project [_testing_universal_key, 
primary_key, struct(...) AS person]
                                          +- LogicalRDD [...]

== Analyzed Logical Plan ==
p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction: vector, 
probability: vector, prediction: double, expected_prediction: float
Filter NOT (cast(expected_prediction as double) = prediction)
+- Project [p1, p2, pair_features, rawPrediction, probability, prediction, 
cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS 
expected_prediction]
   +- Project [p1, p2, pair_features, rawPrediction, probability, 
UDF(rawPrediction) AS prediction]
      +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS 
probability]
         +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
            +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
               +- Project [struct(...) AS p1, struct(...) AS p2]
                  +- Project [_blocking_key, ..., ...]
                     +- Join Inner, (_blocking_key = _blocking_key)
                        :- SubqueryAlias p1
                        :  +- Project [..., <lambda>(dataset_name, primary_key, 
person) AS _blocking_key]
                        :     +- Project [...]
                        :        +- Project [primary_key, universal_key, 
_testing_universal_key, struct(...) AS person]
                        :           +- Project [...]
                        :              +- Project [_testing_universal_key, 
primary_key, struct(...) AS person]
                        :                 +- LogicalRDD [...]
                        +- SubqueryAlias p2
                           +- Project [..., <lambda>(dataset_name, primary_key, 
person) AS _blocking_key]
                              +- Project [...]
                                 +- Project [primary_key, universal_key, 
_testing_universal_key, struct(...) AS person]
                                    +- Project [...]
                                       +- Project [_testing_universal_key, 
primary_key, struct(...) AS person]
                                          +- LogicalRDD [...]

== Optimized Logical Plan ==
Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person, 
struct(...).person) AS pair_features, UDF(<lambda>(struct(...).person, 
struct(...).person)) AS rawPrediction, UDF(UDF(<lambda>(struct(...).person, 
struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person, 
struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key = 
struct(...)._testing_universal_key) as float) AS expected_prediction]
+- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key = 
struct(...)._testing_universal_key) as float) as double) = 
UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) && (_blocking_key = 
_blocking_key))
   :- Project [..., <lambda>(dataset_name, primary_key, person) AS 
_blocking_key]
   :  +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
   :     +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 
replicas)
   :        :  +- *Project [primary_key, struct(...) AS person, test_people AS 
dataset_name]
   :        :     +- Scan ExistingRDD[...]
   +- Project [..., <lambda>(dataset_name, primary_key, person) AS 
_blocking_key]
      +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
         +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 
replicas)
            :  +- *Project [primary_key, struct(...) AS person, test_people AS 
dataset_name]
            :     +- Scan ExistingRDD[...]

== Physical Plan ==
java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person, 
struct(...).person), requires attributes from more than one child.
{code}

Note the error at the end when Spark tries to print the physical plan. I've 
scrubbed some Project fields from the plan to simplify the display, but if I've 
scrubbed anything you think is important let me know.

I can get around this problem by adding a {{persist()}} right before the 
operation that fails. The failing operation is a filter.

Any clues on how I can boil this down to a minimal repro? Any clues about where 
the problem is?




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to