Hey Plamen, I think what you are looking for is the AggregateFunction. This you can use on keyed streams. The Javadoc [1] contains an example for your use case (averaging).
– Ufuk [1] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov <plamen.pas...@next-stream.com> wrote: > Hi, > > I'm trying to calculate the running average of session length and i want to > trigger the computation on a regular let's say 2 minutes interval. I'm > trying to do it like this: > > package flink; > > import lombok.AllArgsConstructor; > import lombok.NoArgsConstructor; > import lombok.ToString; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; > import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; > import org.apache.flink.streaming.api.windowing.time.Time; > import org.apache.flink.streaming.api.windowing.windows.TimeWindow; > import org.apache.flink.util.Collector; > > import java.sql.Timestamp; > import java.time.Instant; > import java.time.LocalDateTime; > import java.util.TimeZone; > > > public class StreamingJob { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > SingleOutputStreamOperator<Event> sessions = env > .socketTextStream("localhost", 9000, "\n") > .map(new MapFunction<String, Event>() { > @Override > public Event map(String value) throws Exception { > String[] row = value.split(","); > return new Event(Long.valueOf(row[0]), row[1], > Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime()); > } > }) > .assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) { > @Override > public long extractTimestamp(Event element) { > return element.timestamp; > } > }) > .keyBy("userId", "sessionId") > .maxBy("length"); > > > sessions > .timeWindowAll(Time.seconds(60), Time.seconds(30)) > .apply(new AllWindowFunction<Event, Avg, TimeWindow>() { > @Override > public void apply(TimeWindow window, Iterable<Event> > values, Collector<Avg> out) throws Exception { > long sum = 0; > int count = 0; > > for (Event event : values) { > sum += event.length; > count++; > } > > double avg = sum / count; > LocalDateTime windowStart = > LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()), > TimeZone.getDefault().toZoneId()); > LocalDateTime windowEnd = > LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()), > TimeZone.getDefault().toZoneId()); > out.collect(new Avg(avg, windowStart.toString(), > windowEnd.toString())); > } > }); > > env.execute(); > } > > @AllArgsConstructor > @NoArgsConstructor > @ToString > public static class Avg { > public double length; > public String windowStart; > public String windowEnd; > } > > @AllArgsConstructor > @NoArgsConstructor > @ToString > public static class Event { > public long userId; > public String sessionId; > public long length; > public long timestamp; > } > } > > First i want to extract the last session event for every user-session > because it contains the total session length. Then i want to calculate the > average session length based on the data from > previous operation (based on the sessions variable). > > Example: > > 1,s1,100,2017-12-13 11:58:01 > 1,s1,150,2017-12-13 11:58:02 > 1,s1,160,2017-12-13 11:58:03 > 2,s1,100,2017-12-13 11:58:04 > > sessions variable should contain those rows: > 1,s1,160,2017-12-13 11:58:03 > 2,s1,100,2017-12-13 11:58:04 > > but it's returning the max length row only for the corresponding event. > > Questions: > - how to collect the data for all groups in sessions variable? > - is there another way to achieve this functionality because with my > implementation the average will be computed on single node because sessions > is of type SingleOutputStreamOperator<Event> > - can i use ContinuousEventTimeTrigger to trigger at regular intervals ? > > Thanks