Github user EntilZha commented on the pull request:
https://github.com/apache/spark/pull/7580#issuecomment-123805715
Few things:
I added the python API, but I am getting a runtime exception, which is
causing it to fallback to something else, then succeeding, stacktrace below:
```scala
15/07/22 10:39:02 ERROR Project: Failed to generate mutable projection,
fallback to interpreted
java.util.concurrent.ExecutionException: java.lang.Exception: failed to
compile:
public Object
generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
return new SpecificProjection(expr);
}
class SpecificProjection extends
org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
private org.apache.spark.sql.catalyst.expressions.Expression[]
expressions;
private org.apache.spark.sql.catalyst.expressions.MutableRow
mutableRow;
public
SpecificProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expr)
{
expressions = expr;
mutableRow = new
org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
}
public
org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection
target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
mutableRow = row;
return this;
}
/* Provide immutable access to the last projected row. */
public InternalRow currentValue() {
return (InternalRow) mutableRow;
}
public Object apply(Object _i) {
InternalRow i = (InternalRow) _i;
boolean isNull2 = i.isNullAt(0);
scala.collection.Seq primitive3 = isNull2 ?
null : ((scala.collection.Seq)i.apply(0));
boolean isNull0 = false;
if (isNull2) {
primitive1 = false;
} else {
primitive1 = primitive3.contains(1);
}
if(isNull0)
mutableRow.setNullAt(0);
else
mutableRow.setBoolean(0, primitive1);
return mutableRow;
}
}
at
com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
at
com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
at
com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at
com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
at
com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
at
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:280)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:121)
at
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:31)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:335)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:332)
at
org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:209)
at
org.apache.spark.sql.execution.Project.buildProjection$lzycompute(basicOperators.scala:42)
at
org.apache.spark.sql.execution.Project.buildProjection(basicOperators.scala:42)
at
org.apache.spark.sql.execution.Project$$anonfun$1.apply(basicOperators.scala:45)
at
org.apache.spark.sql.execution.Project$$anonfun$1.apply(basicOperators.scala:44)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: failed to compile:
public Object
generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
return new SpecificProjection(expr);
}
class SpecificProjection extends
org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
private org.apache.spark.sql.catalyst.expressions.Expression[]
expressions;
private org.apache.spark.sql.catalyst.expressions.MutableRow
mutableRow;
public
SpecificProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expr)
{
expressions = expr;
mutableRow = new
org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
}
public
org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection
target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
mutableRow = row;
return this;
}
/* Provide immutable access to the last projected row. */
public InternalRow currentValue() {
return (InternalRow) mutableRow;
}
public Object apply(Object _i) {
InternalRow i = (InternalRow) _i;
boolean isNull2 = i.isNullAt(0);
scala.collection.Seq primitive3 = isNull2 ?
null : ((scala.collection.Seq)i.apply(0));
boolean isNull0 = false;
if (isNull2) {
primitive1 = false;
} else {
primitive1 = primitive3.contains(1);
}
if(isNull0)
mutableRow.setNullAt(0);
else
mutableRow.setBoolean(0, primitive1);
return mutableRow;
}
}
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:302)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:322)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:319)
at
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 32 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 40, Column
21: Expression "primitive1" is not an rvalue
at
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:10174)
at
org.codehaus.janino.UnitCompiler.toRvalueOrCompileException(UnitCompiler.java:6036)
at
org.codehaus.janino.UnitCompiler.compileContext2(UnitCompiler.java:3172)
at org.codehaus.janino.UnitCompiler.access$5400(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$9.visitAmbiguousName(UnitCompiler.java:3150)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:3138)
at
org.codehaus.janino.UnitCompiler.compileContext(UnitCompiler.java:3160)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2668)
at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:993)
at org.codehaus.janino.UnitCompiler.access$1000(UnitCompiler.java:185)
at org.codehaus.janino.UnitCompiler$4.visitBlock(UnitCompiler.java:935)
at org.codehaus.janino.Java$Block.accept(Java.java:2012)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1742)
at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$4.visitIfStatement(UnitCompiler.java:937)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2157)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
at
org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:822)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
at
org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
at
org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
at
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
at
org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:297)
```
Also, based on discussion above
1. We should be using `eval`, since we are overriding default behavior for
`null` arguments.
2. Perhaps I should remove the string `array_contains` in
`functions.scala`, I am not sure I understand why this should be removed.
3. There are some tests in `DataFrameFunctionsSuite.scala` which fail and
test behavior under null conditions. I am not sure what is the proper way to
tell spark sql that the arguments are of a type matching the collection (say
IntegerType), but that it is a nullable, and passing it a null.
4. Do I need to do anything for checking comparability of types then? If
all catalyst types are comparable, then seems like I shouldn't have to.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]