Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event
Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。
public class MyExample {
public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置时间特性为
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 水印策略
WatermarkStrategy<WC> watermarkStrategy = WatermarkStrategy
.<WC>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new
SerializableTimestampAssigner<WC>() {
@Override
public long extractTimestamp(WC wc, long l) {
return wc.getEventTime() * 1000;
}
});
// Kafka 配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "Kafka地址:9092");
properties.setProperty("group.id", "test");
env.addSource(new FlinkKafkaConsumer<>("flinktest1", new
JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())
// map 构建 WC 对象
.map(new MapFunction<ObjectNode, WC>() {
@Override
public WC map(ObjectNode jsonNode) throws Exception {
JsonNode valueNode = jsonNode.get("value");
WC wc = new
WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());
return wc;
}
})
// 设定水印策略
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(WC::getWord)
// 窗口设置,这里设置为滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 设置窗口延迟
.allowedLateness(Time.seconds(2))
.reduce(new ReduceFunction<WC>() {
@Override
public WC reduce(WC wc, WC t1) throws Exception {
return new WC(wc.getWord(), wc.getCount() +
t1.getCount());
}
})
.print();
env.execute();
}
static class WC {
public String word;
public int count;
public long eventTime;
public long getEventTime() {
return eventTime;
}
public void setEventTime(long eventTime) {
this.eventTime = eventTime;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public WC(String word, int count) {
this.word = word;
this.count = count;
}
public WC(String word, int count,long eventTime) {
this.word = word;
this.count = count;
this.eventTime = eventTime;
}
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/