Hi guys,
I have the following code:

SingleOutputStreamOperator<Event> lastUserSession = env
        .socketTextStream("localhost",9000,"\n")
        .map(new MapFunction<String, Event>() {
            @Override public Event map(String value)throws Exception {
                String[] row = value.split(",");
                return new Event(Long.valueOf(row[0]), row[1], 
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
            }
        })
        .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
            @Override public long extractTimestamp(Event element) {
                return element.timestamp;
            }
        })
        .keyBy("userId","sessionId")
        .window(TumblingEventTimeWindows.of(Time.seconds(60)))
        .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
        .maxBy("length",false);

lastUserSession
        .timeWindowAll(Time.seconds(60))
        .aggregate(new AverageSessionLengthAcrossAllUsers())
        .print();

What i'm trying to achieve is to calculate the average session length every 10 
seconds. The problem is that once the window length is 60 seconds and a 
computation is triggered
every 10 seconds i will receive duplicate events in my average calculation 
method so the average will not be correct. If i move 
ContinuousProcessingTimeTrigger down before
AverageSessionLengthAcrossAllUsers() then it's not triggering every 10 seconds.
Any other suggestions how to workaround this?

Thanks

Reply via email to