I've been working through the flink demo applications and started in on a prototype, but have run into an issue with how to approach the problem of getting a daily unique user count from a traffic stream. I'm using a time characteristic event time.
Sample event stream (timestamp,userid): 2015-12-02T01:13:21.002Z,bc030136a91aa46eb436dcb28fa72fed 2015-12-02T01:13:21.003Z,bc030136a91aa46eb436dcb28fa72fed 2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28fa72fed 2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed 2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed 2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381 2015-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381 2015-12-02T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab 2015-12-02T00:14:56.000Z,ccd72e4535c92c499bb66eea6f4f9aab 2015-12-02T00:14:56.001Z,ccd72e4535c92c499bb66eea6f4f9aab 2015-12-02T00:29:52.001Z,ccd72e4535c92c499bb66eea6f4f9aab 2015-12-02T00:29:52.002Z,ccd72e4535c92c499bb66eea6f4f9aab 2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb66eea6f4f9aab Requirements: 1. Get a count of the unique users by day. 2. Early results should be emitted as quickly as possible. (I've been trying to use 30/60 seconds windows) 3. Events are accepted up to 2 days late. I've used the following as guidance: EventTimeTriggerWithEarlyAndLateFiring https://raw.githubusercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java Multi-window transformations http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CBAY182-W870B521BDC5709973990D5ED9A0%40phx.gbl%3E Summing over aggregate windows http://stackoverflow.com/questions/36791816/how-to-declare-1-minute-tumbling-window but I can't get the reduce/aggregation logic working correctly. Here's a sample of how I have the windows setup with a datastream of tuple3 with timestamp, date only, userid: DataStream<Tuple3<DateTime, String, String>> uniqueLogins = logins .keyBy(1,2) .timeWindow(Time.days(1)) .trigger(EventTimeNoLog.create() //Same as EventTimeTriggerWithEarlyAndLateFiring, just modified logging since it is potentially 2-3x per event read in .withEarlyFiringEvery(Time.seconds(60)) .withLateFiringEvery(Time.seconds(600)) .withAllowedLateness(Time.days(2))) //Reduce to earliest timestamp for a given day for a user .reduce(new ReduceFunction<Tuple3<DateTime, String, String>>() { public Tuple3<DateTime, String, String> reduce(Tuple3<DateTime, String, String> event1, Tuple3<DateTime, String, String> event2) { return event1; } }); SingleOutputStreamOperator<Tuple2<String, Long>> window = uniqueLogins .timeWindowAll(Time.days(1)) .trigger(EventTimeTriggerWithEarlyAndLateFiring.create() .withEarlyFiringEvery(Time.seconds(60)) .withLateFiringEvery(Time.seconds(600)) .withAllowedLateness(Time.days(2)) .aggregator()) //Modified EventTimeTriggerWithEarlyAndLateFiring that does a fire_and_purge on onProcessingTime when aggregator is set //Manually count .apply(new AllWindowFunction<Tuple3<DateTime,String,String>, Tuple2<String, Long>, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Tuple3<DateTime,String,String>> input, Collector<Tuple2<String, Long>> out) throws Exception { int count = 0; String windowTime = null; for (Tuple3<DateTime,String,String> login: input) { windowTime = login.f1; count++; } out.collect (new Tuple2<String, Long>(windowTime, new Long(count))); } }); >From the logging I've put in place, it seems that there is a performance issue with the first keyBy where there now a unique window for each date/user combination (in my sample data around 500k windows) which when reducing is not emitting results at a constant enough rate for the second window to perform its aggregation at a scheduleable interval. Is there a better approach to performing this type of calculation directly in flink?