??????????globalWindow????????????trigger????????????????????????????????????????????????????????????times????????????????.public
 class PathMonitorJob {
    private static final String PATH = "path";
    private static double THRESHOLD;
    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        THRESHOLD = parameterTool.getDouble("threshold",1000d);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
                .setBootstrapServers("127.0.0.1:9092")
                .setTopics("user_behavior")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();


        DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, 
WatermarkStrategy
                        
.<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new
 MyTimeAssigner("times")), "Kafka Source")
                .map(JSONObject::parseObject);

        WindowedStream<JSONObject, String, GlobalWindow&gt; windowedStream = 
ds.keyBy(value -&gt; value.getString("vin")).window(GlobalWindows.create());
        windowedStream.trigger(PurgingTrigger.of(DeltaTrigger.of(THRESHOLD
                , (oldDataPoint, newDataPoint) -&gt; 
newDataPoint.getDoubleValue(PATH) - oldDataPoint.getDoubleValue(PATH)
                , 
TypeInformation.of(JSONObject.class).createSerializer(env.getConfig()))))
                .process(new CountPathProcess()).print();
        env.execute("PathMonitorJob");
    }
}




------------------&nbsp;????????&nbsp;------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2021??10??12??(??????) ????12:32
??????:&nbsp;"[email protected]"<[email protected]&gt;;

????:&nbsp;??????flink-1.14 ????  kafkasource ????watermark????



Hi


????????????????????????????,???????????????? wm &gt; window.end_time 
????????????????????????????,?????? wm 
????????????????,????????????????????????????


Best
JasonLee


??2021??10??12?? 11:26??kcz<[email protected]&gt; ??????
???????? 
times??????????????????????????????????????????????????????+20??????????????StreamExecutionEnvironment
 env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&amp;gt; kafkaSource = KafkaSource.<String&amp;gt;builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();


DataStream<JSONObject&amp;gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String&amp;gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new
 MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();

回复