??????????
//??json??????LogBean
SingleOutputStreamOperator<LogBean> data =
filter.map(new Json2LogBean());
KeyedStream<Tuple3<String, String, Integer>, String>
tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<LogBean>() {
@Override
public long
extractAscendingTimestamp(LogBean element) {
LocalDateTime parse =
LocalDateTime.parse(element.getOperTime(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
long eventTime =
parse.toEpochSecond(ZoneOffset.of("+8"));
System.out.println(eventTime);
return eventTime;
}
}).map(new MapFunction<LogBean, Tuple3<String,
String, Integer>>() {
@Override
public Tuple3<String, String,
Integer> map(LogBean value) throws Exception {
//????????id??????
return new
Tuple3<>(value.getNickname(), value.toString(), 1);
}
}).keyBy(new KeySelector<Tuple3<String, String,
Integer>, String>() {
@Override
public String getKey(Tuple3<String,
String, Integer> value) throws Exception {
return value.f0;
}
});
WindowedStream<Tuple3<String, String, Integer>,
String, TimeWindow> window =
tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
window.sum(2).print();
????sum????????reduce????reduce????????????????????reduce??????????????????????????????????????????
????????????????
//??json??????LogBean
SingleOutputStreamOperator<LogBean> data =
filter.map(new Json2LogBean());
KeyedStream<Tuple3<String, String, Integer>, String>
tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<LogBean>() {
@Override
public long
extractAscendingTimestamp(LogBean element) {
LocalDateTime parse =
LocalDateTime.parse(element.getOperTime(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
long eventTime =
parse.toEpochSecond(ZoneOffset.of("+8"));
System.out.println(eventTime);
return eventTime;
}
}).map(new MapFunction<LogBean, Tuple3<String,
String, Integer>>() {
@Override
public Tuple3<String, String,
Integer> map(LogBean value) throws Exception {
//????????id??????
return new
Tuple3<>(value.getNickname(), value.toString(), 1);
}
}).keyBy(new KeySelector<Tuple3<String, String,
Integer>, String>() {
@Override
public String getKey(Tuple3<String,
String, Integer> value) throws Exception {
return value.f0;
}
});
WindowedStream<Tuple3<String, String, Integer>,
String, TimeWindow> window =
tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
window.sum(2).print();
????sum????????reduce????reduce????????????????????reduce??????????????????????????????????????????
??????