???????? 
times??????????????????????????????????????????????????????+20??????????????StreamExecutionEnvironment
 env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String&gt; kafkaSource = KafkaSource.<String&gt;builder()
        .setBootstrapServers("127.0.0.1:9092")
        .setTopics("user_behavior")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();


DataStream<JSONObject&gt; ds = env.fromSource(kafkaSource, WatermarkStrategy
                
.<String&gt;forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner(new
 MyTimeAssigner("times")), "Kafka Source")
        .map(JSONObject::parseObject);ds.print();

回复