??????????globalWindow????????????trigger????????????????????????????????????????????????????????????times????????????????.public
class PathMonitorJob {
private static final String PATH = "path";
private static double THRESHOLD;
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
THRESHOLD = parameterTool.getDouble("threshold",1000d);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<JSONObject> ds = env.fromSource(kafkaSource,
WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new
MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);
WindowedStream<JSONObject, String, GlobalWindow> windowedStream =
ds.keyBy(value -> value.getString("vin")).window(GlobalWindows.create());
windowedStream.trigger(PurgingTrigger.of(DeltaTrigger.of(THRESHOLD
, (oldDataPoint, newDataPoint) ->
newDataPoint.getDoubleValue(PATH) - oldDataPoint.getDoubleValue(PATH)
,
TypeInformation.of(JSONObject.class).createSerializer(env.getConfig()))))
.process(new CountPathProcess()).print();
env.execute("PathMonitorJob");
}
}
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2021??10??12??(??????) ????12:32
??????: "[email protected]"<[email protected]>;
????: ??????flink-1.14 ???? kafkasource ????watermark????
Hi
????????????????????????????,???????????????? wm > window.end_time
????????????????????????????,?????? wm
????????????????,????????????????????????????
Best
JasonLee
??2021??10??12?? 11:26??kcz<[email protected]> ??????
????????
times??????????????????????????????????????????????????????+20??????????????StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new
MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();