Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event
Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。

public class MyExample {

    public static void main(String[] args) throws Exception {
        // 创建环境
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 设置时间特性为
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 水印策略
        WatermarkStrategy<WC> watermarkStrategy = WatermarkStrategy
                .<WC>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new
SerializableTimestampAssigner<WC>() {
                    @Override
                    public long extractTimestamp(WC wc, long l) {
                        return wc.getEventTime() * 1000;
                    }
                });

        // Kafka 配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "Kafka地址:9092");
        properties.setProperty("group.id", "test");

        env.addSource(new FlinkKafkaConsumer<>("flinktest1", new
JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())
                // map 构建 WC 对象
                .map(new MapFunction<ObjectNode, WC>() {
                    @Override
                    public WC map(ObjectNode jsonNode) throws Exception {
                        JsonNode valueNode = jsonNode.get("value");
                        WC wc = new
WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());
                        return wc;
                    }
                })
                // 设定水印策略
                .assignTimestampsAndWatermarks(watermarkStrategy)
                .keyBy(WC::getWord)
                // 窗口设置,这里设置为滚动窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 设置窗口延迟
                .allowedLateness(Time.seconds(2))
                .reduce(new ReduceFunction<WC>() {
                    @Override
                    public WC reduce(WC wc, WC t1) throws Exception {
                        return new WC(wc.getWord(), wc.getCount() +
t1.getCount());
                    }
                })
                .print();

        env.execute();
    }


    static class WC {
        public String word;
        public int count;
        public long eventTime;

        public long getEventTime() {
            return eventTime;
        }

        public void setEventTime(long eventTime) {
            this.eventTime = eventTime;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getCount() {
            return count;
        }

        public void setCount(int count) {
            this.count = count;
        }

        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }
        
        public WC(String word, int count,long eventTime) {
            this.word = word;
            this.count = count;
            this.eventTime = eventTime;
        }
      
        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复