Hey Plamen,

I think what you are looking for is the AggregateFunction. This you
can use on keyed streams. The Javadoc [1] contains an example for your
use case (averaging).

– Ufuk

[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java

On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
> Hi,
>
> I'm trying to calculate the running average of session length and i want to
> trigger the computation on a regular let's say 2 minutes interval. I'm
> trying to do it like this:
>
> package flink;
>
> import lombok.AllArgsConstructor;
> import lombok.NoArgsConstructor;
> import lombok.ToString;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
> import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
>
> import java.sql.Timestamp;
> import java.time.Instant;
> import java.time.LocalDateTime;
> import java.util.TimeZone;
>
>
> public class StreamingJob {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         SingleOutputStreamOperator<Event> sessions = env
>                 .socketTextStream("localhost", 9000, "\n")
>                 .map(new MapFunction<String, Event>() {
>                     @Override
>                     public Event map(String value) throws Exception {
>                         String[] row = value.split(",");
>                         return new Event(Long.valueOf(row[0]), row[1],
> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>                     }
>                 })
>                 .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>                     @Override
>                     public long extractTimestamp(Event element) {
>                         return element.timestamp;
>                     }
>                 })
>                 .keyBy("userId", "sessionId")
>                 .maxBy("length");
>
>
>         sessions
>                 .timeWindowAll(Time.seconds(60), Time.seconds(30))
>                 .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>                     @Override
>                     public void apply(TimeWindow window, Iterable<Event>
> values, Collector<Avg> out) throws Exception {
>                         long sum = 0;
>                         int count = 0;
>
>                         for (Event event : values) {
>                             sum += event.length;
>                             count++;
>                         }
>
>                         double avg = sum / count;
>                         LocalDateTime windowStart =
> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
> TimeZone.getDefault().toZoneId());
>                         LocalDateTime windowEnd =
> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
> TimeZone.getDefault().toZoneId());
>                         out.collect(new Avg(avg, windowStart.toString(),
> windowEnd.toString()));
>                     }
>                 });
>
>         env.execute();
>     }
>
>     @AllArgsConstructor
>     @NoArgsConstructor
>     @ToString
>     public static class Avg {
>         public double length;
>         public String windowStart;
>         public String windowEnd;
>     }
>
>     @AllArgsConstructor
>     @NoArgsConstructor
>     @ToString
>     public static class Event {
>         public long userId;
>         public String sessionId;
>         public long length;
>         public long timestamp;
>     }
> }
>
> First i want to extract the last session event for every user-session
> because it contains the total session length. Then i want to calculate the
> average session length based on the data from
> previous operation (based on the sessions variable).
>
> Example:
>
> 1,s1,100,2017-12-13 11:58:01
> 1,s1,150,2017-12-13 11:58:02
> 1,s1,160,2017-12-13 11:58:03
> 2,s1,100,2017-12-13 11:58:04
>
> sessions variable should contain those rows:
> 1,s1,160,2017-12-13 11:58:03
> 2,s1,100,2017-12-13 11:58:04
>
> but it's returning the max length row only for the corresponding event.
>
> Questions:
> - how to collect the data for all groups in sessions variable?
> - is there another way to achieve this functionality because with my
> implementation the average will be computed on single node because sessions
> is of type SingleOutputStreamOperator<Event>
> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>
> Thanks

Reply via email to