GitHub user kiszk opened a pull request:
https://github.com/apache/spark/pull/17302
[SPARK-19959][SQL] Fix to throw NullPointerException in
df[java.lang.Long].collect
## What changes were proposed in this pull request?
This PR fixes `NullPointerException` in the generated code by Catalyst.
When we run the following code, we get the following `NullPointerException`.
This is because there is no null checks for `inputadapter_value` while
`java.lang.Long inputadapter_value` at Line 30 may have `null`.
This happen when a type of DataFrame is nullable primitive type such as
`java.lang.Long` and the wholestage codegen is used. While the physical plan
keeps `nullable=true` in `input[0, java.lang.Long, true].longValue`,
`BoundReference.doGenCode` ignores `nullable=true`. Thus, nullcheck code will
not be generated and `NullPointerException` will occur.
This PR checks the nullability and correctly generates nullcheck if needed.
```java
sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF.collect
```
```java
Caused by: java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:37)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:393)
...
```
Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private UnsafeRow serializefromobject_result;
/* 010 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
serializefromobject_holder;
/* 011 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
serializefromobject_rowWriter;
/* 012 */
/* 013 */ public GeneratedIterator(Object[] references) {
/* 014 */ this.references = references;
/* 015 */ }
/* 016 */
/* 017 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 018 */ partitionIndex = index;
/* 019 */ this.inputs = inputs;
/* 020 */ inputadapter_input = inputs[0];
/* 021 */ serializefromobject_result = new UnsafeRow(1);
/* 022 */ this.serializefromobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
0);
/* 023 */ this.serializefromobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
1);
/* 024 */
/* 025 */ }
/* 026 */
/* 027 */ protected void processNext() throws java.io.IOException {
/* 028 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 030 */ java.lang.Long inputadapter_value =
(java.lang.Long)inputadapter_row.get(0, null);
/* 031 */
/* 032 */ boolean serializefromobject_isNull = true;
/* 033 */ long serializefromobject_value = -1L;
/* 034 */ if (!false) {
/* 035 */ serializefromobject_isNull = false;
/* 036 */ if (!serializefromobject_isNull) {
/* 037 */ serializefromobject_value =
inputadapter_value.longValue();
/* 038 */ }
/* 039 */
/* 040 */ }
/* 041 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 042 */
/* 043 */ if (serializefromobject_isNull) {
/* 044 */ serializefromobject_rowWriter.setNullAt(0);
/* 045 */ } else {
/* 046 */ serializefromobject_rowWriter.write(0,
serializefromobject_value);
/* 047 */ }
/* 048 */ append(serializefromobject_result);
/* 049 */ if (shouldStop()) return;
/* 050 */ }
/* 051 */ }
/* 052 */ }
```
Generated code with this PR (Only line 34 is changed)
```java
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private UnsafeRow serializefromobject_result;
/* 010 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
serializefromobject_holder;
/* 011 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
serializefromobject_rowWriter;
/* 012 */
/* 013 */ public GeneratedIterator(Object[] references) {
/* 014 */ this.references = references;
/* 015 */ }
/* 016 */
/* 017 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 018 */ partitionIndex = index;
/* 019 */ this.inputs = inputs;
/* 020 */ inputadapter_input = inputs[0];
/* 021 */ serializefromobject_result = new UnsafeRow(1);
/* 022 */ this.serializefromobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
0);
/* 023 */ this.serializefromobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
1);
/* 024 */
/* 025 */ }
/* 026 */
/* 028 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 030 */ java.lang.Long inputadapter_value =
(java.lang.Long)inputadapter_row.get(0, null);
/* 031 */
/* 032 */ boolean serializefromobject_isNull = true;
/* 033 */ long serializefromobject_value = -1L;
/* 034 */ if (!((false) || inputadapter_row.isNullAt(0))) {
/* 035 */ serializefromobject_isNull = false;
/* 036 */ if (!serializefromobject_isNull) {
/* 037 */ serializefromobject_value =
inputadapter_value.longValue();
/* 038 */ }
/* 039 */
/* 040 */ }
/* 041 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 042 */
/* 043 */ if (serializefromobject_isNull) {
/* 044 */ serializefromobject_rowWriter.setNullAt(0);
/* 045 */ } else {
/* 046 */ serializefromobject_rowWriter.write(0,
serializefromobject_value);
/* 047 */ }
/* 048 */ append(serializefromobject_result);
/* 049 */ if (shouldStop()) return;
/* 050 */ }
/* 051 */ }
/* 052 */ }
```
## How was this patch tested?
Added new test suites in `DataFrameSuites`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kiszk/spark SPARK-19959
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/17302.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #17302
----
commit ec6aa5095138cd6cd22109096463182f326e15f9
Author: Kazuaki Ishizaki <[email protected]>
Date: 2017-03-15T10:09:12Z
initial commit
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]