报错信息如下:
java.lang.IllegalArgumentException: Can not set long field
com.xxx.buzz.pojo.AbstractDrRecord.timestamp to null value
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:167)
at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(
UnsafeFieldAccessorImpl.java:171)
at sun.reflect.UnsafeLongFieldAccessorImpl.set(
UnsafeLongFieldAccessorImpl.java:80)
at java.lang.reflect.Field.set(Field.java:764)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:409)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
根据堆栈找到报错位置代码为:
try {
for (int i = 0; i < numFields; i++) {
boolean isNull = source.readBoolean();
if (fields[i] != null) {
if (isNull) {
fields[i].set(target, null); // 此处报错,设置null,但这个字段是long基础数据类型,非包装类型。
} else {
Object field = fieldSerializers[i].deserialize(source);
fields[i].set(target, field);
}
} else if (!isNull) {
// read and dump a pre-existing field value
fieldSerializers[i].deserialize(source);
}
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Error during POJO copy, this should not
happen since we check the fields before.", e);
}