我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:

public static String timeStampToDate(Long timestamp){
        ThreadLocal<SimpleDateFormat> threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
        String format = threadLocal.get().format(new Date(timestamp));
        return format.substring(0,19);
    }



根据数据所在的分钟keyBy后,我用了一个1min的滚动窗口,每500ms trigger一次,如下:

.
.
.
//根据数据所在的分钟(processingTime) keyBy
        KeyedStream<ShareRealTimeData, String> keyByStream =
signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() {
            @Override
            public String getKey(ShareRealTimeData value) throws Exception {
                return DateUtilMinutes.timeStampToDate(new
Date().getTime());
            }
        });


        SingleOutputStreamOperator<TreeMap&lt;Double, Tuple2&lt;String,
String>>> topNforEveWindow = keyByStream
               
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(1000)))
               
.trigger(ContinuousProcessingTimeTrigger.of(Time.milliseconds(500)))
//                .evictor(TimeEvictor.of(Time.seconds(0), true))
                .process(new MyProcessWindowFuncation());


        //sink
        topNforEveWindow.printToErr("topNforEveWindow====");

.
.
.

程序运行时,随机在某些整分钟时抛出以下空指针警告:
19:49:22,001 WARN  org.apache.flink.runtime.taskmanager.Task                   
[] - Window(TumblingProcessingTimeWindows(1000),
ContinuousProcessingTimeTrigger, TimeEvictor, ProcessWindowFunction$4) ->
Sink: Print to Std. Err (3/8) (222821e43f98390a2f5e3baeb5b542a8) switched
from RUNNING to FAILED.
java.lang.NullPointerException: null
        at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:336)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.runtime.state.heap.StateTable.put(StateTable.java:159)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:99)
~[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:203)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
~[flink-streaming-java_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-runtime_2.11-1.11.2.jar:1.11.2]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]


请帮忙查看是什么原因?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复