Hi, is there are reason for keying on both the "date only" field and the "userid". I think you should be fine by just specifying that you want 1-day windows on your timestamps.
Also, do you have a timestamp extractor in place that takes the timestamp from your data and sets it as the internal timestamp field. This is explained in more detail here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators Cheers, Aljoscha On Thu, 28 Apr 2016 at 06:04 Christopher Santiago <ch...@ninjametrics.com> wrote: > I've been working through the flink demo applications and started in on a > prototype, but have run into an issue with how to approach the problem of > getting a daily unique user count from a traffic stream. I'm using a time > characteristic event time. > > Sample event stream > (timestamp,userid): > 2015-12-02T01:13:21.002Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T01:13:21.003Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed > 2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381 > 2015-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381 > 2015-12-02T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:14:56.000Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:14:56.001Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:29:52.001Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:29:52.002Z,ccd72e4535c92c499bb66eea6f4f9aab > 2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb66eea6f4f9aab > > Requirements: > 1. Get a count of the unique users by day. > 2. Early results should be emitted as quickly as possible. (I've been > trying to use 30/60 seconds windows) > 3. Events are accepted up to 2 days late. > > I've used the following as guidance: > > EventTimeTriggerWithEarlyAndLateFiring > > https://raw.githubusercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java > > Multi-window transformations > > http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CBAY182-W870B521BDC5709973990D5ED9A0%40phx.gbl%3E > > Summing over aggregate windows > > http://stackoverflow.com/questions/36791816/how-to-declare-1-minute-tumbling-window > > but I can't get the reduce/aggregation logic working correctly. Here's a > sample of how I have the windows setup with a datastream of tuple3 with > timestamp, date only, userid: > > DataStream<Tuple3<DateTime, String, String>> uniqueLogins = logins > .keyBy(1,2) > .timeWindow(Time.days(1)) > .trigger(EventTimeNoLog.create() //Same as > EventTimeTriggerWithEarlyAndLateFiring, just modified logging since it is > potentially 2-3x per event read in > .withEarlyFiringEvery(Time.seconds(60)) > .withLateFiringEvery(Time.seconds(600)) > .withAllowedLateness(Time.days(2))) > //Reduce to earliest timestamp for a given day for a user > .reduce(new ReduceFunction<Tuple3<DateTime, String, String>>() { > public Tuple3<DateTime, String, String> reduce(Tuple3<DateTime, String, > String> event1, Tuple3<DateTime, String, String> event2) { > return event1; > } > }); > > SingleOutputStreamOperator<Tuple2<String, Long>> window = uniqueLogins > .timeWindowAll(Time.days(1)) > .trigger(EventTimeTriggerWithEarlyAndLateFiring.create() > .withEarlyFiringEvery(Time.seconds(60)) > .withLateFiringEvery(Time.seconds(600)) > .withAllowedLateness(Time.days(2)) > .aggregator()) //Modified EventTimeTriggerWithEarlyAndLateFiring that > does a fire_and_purge on onProcessingTime when aggregator is set > //Manually count > .apply(new AllWindowFunction<Tuple3<DateTime,String,String>, > Tuple2<String, Long>, TimeWindow>() { > @Override > public void apply(TimeWindow window, > Iterable<Tuple3<DateTime,String,String>> input, Collector<Tuple2<String, > Long>> out) throws Exception { > int count = 0; > String windowTime = null; > > for (Tuple3<DateTime,String,String> login: input) { > windowTime = login.f1; > count++; > } > out.collect (new Tuple2<String, Long>(windowTime, new Long(count))); > } > }); > > From the logging I've put in place, it seems that there is a performance > issue with the first keyBy where there now a unique window for each > date/user combination (in my sample data around 500k windows) which when > reducing is not emitting results at a constant enough rate for the second > window to perform its aggregation at a scheduleable interval. Is there a > better approach to performing this type of calculation directly in flink? >