I've been working through the flink demo applications and started in on a
prototype, but have run into an issue with how to approach the problem of
getting a daily unique user count from a traffic stream.  I'm using a time
characteristic event time.

Sample event stream
(timestamp,userid):
2015-12-02T01:13:21.002Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T01:13:21.003Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381
2015-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381
2015-12-02T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:14:56.000Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:14:56.001Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:29:52.001Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:29:52.002Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb66eea6f4f9aab

Requirements:
1.  Get a count of the unique users by day.
2.  Early results should be emitted as quickly as possible.  (I've been
trying to use 30/60 seconds windows)
3.  Events are accepted up to 2 days late.

I've used the following as guidance:

EventTimeTriggerWithEarlyAndLateFiring
https://raw.githubusercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java

Multi-window transformations
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CBAY182-W870B521BDC5709973990D5ED9A0%40phx.gbl%3E

Summing over aggregate windows
http://stackoverflow.com/questions/36791816/how-to-declare-1-minute-tumbling-window

but I can't get the reduce/aggregation logic working correctly.  Here's a
sample of how I have the windows setup with a datastream of tuple3 with
timestamp, date only, userid:

DataStream<Tuple3<DateTime, String, String>> uniqueLogins = logins
.keyBy(1,2)
.timeWindow(Time.days(1))
.trigger(EventTimeNoLog.create()  //Same as
EventTimeTriggerWithEarlyAndLateFiring, just modified logging since it is
potentially 2-3x per event read in
.withEarlyFiringEvery(Time.seconds(60))
.withLateFiringEvery(Time.seconds(600))
.withAllowedLateness(Time.days(2)))
//Reduce to earliest timestamp for a given day for a user
.reduce(new ReduceFunction<Tuple3<DateTime, String, String>>() {
public Tuple3<DateTime, String, String> reduce(Tuple3<DateTime, String,
String> event1, Tuple3<DateTime, String, String> event2) {
return event1;
}
});

SingleOutputStreamOperator<Tuple2<String, Long>> window = uniqueLogins
.timeWindowAll(Time.days(1))
.trigger(EventTimeTriggerWithEarlyAndLateFiring.create()
.withEarlyFiringEvery(Time.seconds(60))
.withLateFiringEvery(Time.seconds(600))
.withAllowedLateness(Time.days(2))
.aggregator())  //Modified EventTimeTriggerWithEarlyAndLateFiring that does
a fire_and_purge on onProcessingTime when aggregator is set
//Manually count
.apply(new AllWindowFunction<Tuple3<DateTime,String,String>, Tuple2<String,
Long>, TimeWindow>() {
@Override
public void apply(TimeWindow window,
Iterable<Tuple3<DateTime,String,String>> input, Collector<Tuple2<String,
Long>> out) throws Exception {
int count = 0;
String windowTime = null;

 for (Tuple3<DateTime,String,String> login: input) {
 windowTime = login.f1;
 count++;
 }
 out.collect (new Tuple2<String, Long>(windowTime, new Long(count)));
}
});

>From the logging I've put in place, it seems that there is a performance
issue with the first keyBy where there now a unique window for each
date/user combination (in my sample data around 500k windows) which when
reducing is not emitting results at a constant enough rate for the second
window to perform its aggregation at a scheduleable interval.  Is there a
better approach to performing this type of calculation directly in flink?

Reply via email to