I'm trying to use the Flink DataSet API to validate some records and have
run into an issue. My program uses joins to validate inputs against
reference data. One of the attributes I'm validating is optional, and only
needs to be validated when non-NULL. So I added a filter to prevent the
null-keyed records from being used in the validation join, and was
surprised to receive this exception:

java.lang.RuntimeException: A NullPointerException occured while accessing
a key field in a POJO. Most likely, the value grouped/joined on is null.
Field name: optionalKey
at
org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199)

It looks like the problem is that Flink has pushed the hash partitioning
aspect of the join before the filter for the null-keyed records and is
trying to hash the null keys. The issue can be seen in the plan
visualization:
https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png

I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
project: https://github.com/dkadams/flink-plan-issue/

Is this expected behavior or a bug? FLINK-1915 seems to have the same root
problem, but with a negative performance impact instead of a
RuntimeException.

Regards,
Dylan

Reply via email to