???????? times??????????????????????????????????????????????????????+20??????????????StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("127.0.0.1:9092") .setTopics("user_behavior") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build();
DataStream<JSONObject> ds = env.fromSource(kafkaSource, WatermarkStrategy .<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new MyTimeAssigner("times")), "Kafka Source") .map(JSONObject::parseObject);ds.print();