Hi,
I encounter a weird NPE when try to do aggregate on a fixed window. If I
set a small parallism number the whole job uses only one TaskManager, this
NPE will not happen. But when the job scales to two TaskManagers, the
TaskManager will crash at Create stage. The Flink version I use is 1.11.1.
The NPE exception stack is:
2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] -
Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed
(7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in
aggregating state
at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1
]
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1
.jar:1.11.1]
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:
1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable
.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
... 13 more
My aggregate code is
public class AggregateDataEntry implements
AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>,
Map<DataKey, DataIndex>> {
@Override
public Map<DataKey, DataIndex> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex>
value, Map<DataKey, DataIndex> accumulator) {
accumulator.merge(value.f0, value.f1, DataIndex::add);
return accumulator;
}
@Override
public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex>
accumulator) {
return accumulator;
}
@Override
public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a,
Map<DataKey, DataIndex> b) {
a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex,
DataIndex::add));
return b;
}
}
Could anyone know something about this NPE, thanks!
--
Best regards
Sili Liu