Hi,
is there are reason for keying on both the "date only" field and the
"userid". I think you should be fine by just specifying that you want 1-day
windows on your timestamps.

Also, do you have a timestamp extractor in place that takes the timestamp
from your data and sets it as the internal timestamp field. This is
explained in more detail here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 06:04 Christopher Santiago <ch...@ninjametrics.com>
wrote:

> I've been working through the flink demo applications and started in on a
> prototype, but have run into an issue with how to approach the problem of
> getting a daily unique user count from a traffic stream.  I'm using a time
> characteristic event time.
>
> Sample event stream
> (timestamp,userid):
> 2015-12-02T01:13:21.002Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T01:13:21.003Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381
> 2015-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381
> 2015-12-02T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:14:56.000Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:14:56.001Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:29:52.001Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:29:52.002Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb66eea6f4f9aab
>
> Requirements:
> 1.  Get a count of the unique users by day.
> 2.  Early results should be emitted as quickly as possible.  (I've been
> trying to use 30/60 seconds windows)
> 3.  Events are accepted up to 2 days late.
>
> I've used the following as guidance:
>
> EventTimeTriggerWithEarlyAndLateFiring
>
> https://raw.githubusercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>
> Multi-window transformations
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CBAY182-W870B521BDC5709973990D5ED9A0%40phx.gbl%3E
>
> Summing over aggregate windows
>
> http://stackoverflow.com/questions/36791816/how-to-declare-1-minute-tumbling-window
>
> but I can't get the reduce/aggregation logic working correctly.  Here's a
> sample of how I have the windows setup with a datastream of tuple3 with
> timestamp, date only, userid:
>
> DataStream<Tuple3<DateTime, String, String>> uniqueLogins = logins
> .keyBy(1,2)
> .timeWindow(Time.days(1))
> .trigger(EventTimeNoLog.create()  //Same as
> EventTimeTriggerWithEarlyAndLateFiring, just modified logging since it is
> potentially 2-3x per event read in
> .withEarlyFiringEvery(Time.seconds(60))
> .withLateFiringEvery(Time.seconds(600))
> .withAllowedLateness(Time.days(2)))
> //Reduce to earliest timestamp for a given day for a user
> .reduce(new ReduceFunction<Tuple3<DateTime, String, String>>() {
> public Tuple3<DateTime, String, String> reduce(Tuple3<DateTime, String,
> String> event1, Tuple3<DateTime, String, String> event2) {
> return event1;
> }
> });
>
> SingleOutputStreamOperator<Tuple2<String, Long>> window = uniqueLogins
> .timeWindowAll(Time.days(1))
> .trigger(EventTimeTriggerWithEarlyAndLateFiring.create()
> .withEarlyFiringEvery(Time.seconds(60))
> .withLateFiringEvery(Time.seconds(600))
> .withAllowedLateness(Time.days(2))
> .aggregator())  //Modified EventTimeTriggerWithEarlyAndLateFiring that
> does a fire_and_purge on onProcessingTime when aggregator is set
> //Manually count
> .apply(new AllWindowFunction<Tuple3<DateTime,String,String>,
> Tuple2<String, Long>, TimeWindow>() {
> @Override
> public void apply(TimeWindow window,
> Iterable<Tuple3<DateTime,String,String>> input, Collector<Tuple2<String,
> Long>> out) throws Exception {
> int count = 0;
> String windowTime = null;
>
>  for (Tuple3<DateTime,String,String> login: input) {
>  windowTime = login.f1;
>  count++;
>  }
>  out.collect (new Tuple2<String, Long>(windowTime, new Long(count)));
> }
> });
>
> From the logging I've put in place, it seems that there is a performance
> issue with the first keyBy where there now a unique window for each
> date/user combination (in my sample data around 500k windows) which when
> reducing is not emitting results at a constant enough rate for the second
> window to perform its aggregation at a scheduleable interval.  Is there a
> better approach to performing this type of calculation directly in flink?
>

Reply via email to