如下代码片段:
watermarkStrategy = watermarkStrategy.withTimestampAssigner(
new SerializableTimestampAssigner<KafkaMessageWrapper<T>>() {
@Override
public long extractTimestamp(KafkaMessageWrapper<T>
element, long recordTimestamp) {
try {
return element.getData().getTimestamp();
} catch (Exception e) {
return 86400_000;
}
}
}
);
为什么这样会导致序列化报错呢。换成如下就不报错:
watermarkStrategy = watermarkStrategy.withTimestampAssigner(
(SerializableTimestampAssigner<KafkaMessageWrapper<T>>)
(element, recordTimestamp) -> {
try {
return element.getData().getTimestamp();
} catch (Exception e) {
return 86400_000;
}
}
);
source部分设置watermarkStrategy的时候,导致无法序列化。