Thank you for your reply. Please let me know if other classes o full code is
needed.
/**
* Count how many total events
*/
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(4, env_config);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.setProperty("group.id", "test");
properties.setProperty("client.id", "flink_test");
properties.setProperty("auto.offset.reset", "earliest");
final int maxEventDelay = 5; // events are out of order by max x
seconds
DataStream<BizEvent> bizs = env.addSource(new
FlinkKafkaConsumer09<>(KAFKA_TOPIC,
new BizSchema(), properties)).
assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<BizEvent>() {
long curTimeStamp;
@Override
public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
curTimeStamp = currentTimestamp;
return biz.time.getMillis();
}
@Override
public long getCurrentWatermark() {
return (curTimeStamp - (maxEventDelay * 1000));
}
});
DataStream<Tuple2<BizEvent, Integer>> bizCnt = bizs.flatMap(new
CountBiz());
DataStream<Tuple2<String, Integer>> bizWindowTotal =
bizCnt.timeWindowAll(Time.of(5, TimeUnit.MINUTES))
.apply(new SumStartTsAllWindow());
// Output(start time of windows, counts)
public static class SumStartTsAllWindow implements
AllWindowFunction<Iterable<Tuple2<BizEvent, Integer>>,
Tuple2<String, Integer>, TimeWindow> {
private static DateTimeFormatter timeFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").withLocale(Locale.GERMAN).
withZone(DateTimeZone.forID("Europe/Berlin"));
@Override
public void apply(TimeWindow timeWindow,
Iterable<Tuple2<BizEvent, Integer>> values,
Collector<Tuple2<String, Integer>> collector)
throws Exception {
DateTime startTs = new DateTime(timeWindow.getStart(),
DateTimeZone.forID("Europe/Berlin"));
Iterator<Tuple2<BizEvent, Integer>> it = values.iterator();
int sum=0;
while(it.hasNext()){
Tuple2<BizEvent, Integer> value = it.next();
sum += value.f1;
}
collector.collect(new Tuple2<>(startTs.toString(timeFormatter),
sum));
}
}
// Output (BizEvent, 1)
public static class CountBiz implements FlatMapFunction<BizEvent,
Tuple2<BizEvent, Integer>> {
@Override
public void flatMap(BizEvent bizEvent, Collector<Tuple2<BizEvent,
Integer>> collector) {
//System.out.println("TIme in count!: " + bizEvent.time);
collector.collect(new Tuple2<>(bizEvent, (int) 1));
}
}
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5151.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.