It turn out that our flink branch is out-of-date. Sorry for all the noise. :)

Regards,
Kien

⁣Sent from TypeApp ​

On Dec 20, 2017, 16:42, at 16:42, Kien Truong <duckientru...@gmail.com> wrote:
>Upon further investigation, we found out that the reason:
>
>* The cluster was started on YARN with the hadoop classpath, which
>includes Avro. Therefore, Avro's SpecificRecord class was loaded using
>sun.misc.Launcher$AppClassLoader
>
>
>* Our LteSession class was submitted with the application jar, and
>loaded with the child-first classloader
>
>* Flink check if LteSession is assignable to SpecificRecord, which
>fails.
>
>* Flink fall back to Reflection-based avro writer, which throws NPE on
>null field.
>
>If we change the classloader to parent-first, everything is ok. Now the
>question is why the default doesn't work for us.
>
>Best regards,
>Kien
>
>⁣Sent from TypeApp ​
>
>On Dec 20, 2017, 14:09, at 14:09, Kien Truong <duckientru...@gmail.com>
>wrote:
>>Hi,
>>
>>After upgrading to Flink 1.4, we encounter this exception
>>
>>Caused by: java.lang.NullPointerException: in
>>com.viettel.big4g.avro.LteSession in long null of long in field tmsi
>of
>>com.viettel.big4g.avro.LteSession
>>at
>>org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161)
>>at
>>org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>>at
>>org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132)
>>at
>>org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176)
>>at
>>org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48)
>>at
>>org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>>at
>>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93)
>>at
>>org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114)
>>at
>>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
>>at
>>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
>>at
>>org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102)
>>
>>
>>It seems Flink attempts to use the reflection writer instead of the
>>specific writer for this schema. This is wrong, because our LteSession
>>is an Avro object, and should use the specific writer.
>>
>>Best regards,
>>Kien
>>
>>⁣Sent from TypeApp ​

Reply via email to