????????
times??????????????????????????????????????????????????????+20??????????????StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("127.0.0.1:9092")
.setTopics("user_behavior")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<JSONObject> ds = env.fromSource(kafkaSource, WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new
MyTimeAssigner("times")), "Kafka Source")
.map(JSONObject::parseObject);ds.print();