Hi,
I'm trying to calculate the running average of session length and i want
to trigger the computation on a regular let's say 2 minutes interval.
I'm trying to do it like this:
package flink;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.TimeZone;
public class StreamingJob {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<Event> sessions = env
.socketTextStream("localhost",9000,"\n")
.map(new MapFunction<String, Event>() {
@Override public Event map(String value)throws Exception {
String[] row = value.split(",");
return new Event(Long.valueOf(row[0]), row[1],
Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
}
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
@Override public long extractTimestamp(Event element) {
return element.timestamp;
}
})
.keyBy("userId","sessionId")
.maxBy("length");
sessions
.timeWindowAll(Time.seconds(60), Time.seconds(30))
.apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
@Override public void apply(TimeWindow window, Iterable<Event>
values, Collector<Avg> out)throws Exception {
long sum =0;
int count =0;
for (Event event : values) {
sum += event.length;
count++;
}
double avg = sum / count;
LocalDateTime windowStart =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
TimeZone.getDefault().toZoneId());
LocalDateTime windowEnd =
LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
TimeZone.getDefault().toZoneId());
out.collect(new Avg(avg, windowStart.toString(),
windowEnd.toString()));
}
});
env.execute();
}
@AllArgsConstructor @NoArgsConstructor @ToString public static class Avg {
public double length;
public StringwindowStart;
public StringwindowEnd;
}
@AllArgsConstructor @NoArgsConstructor @ToString public static class Event {
public long userId;
public StringsessionId;
public long length;
public long timestamp;
}
}
First i want to extract the last session event for every user-session because
it contains the total session length. Then i want to calculate the average
session length based on the data from
previous operation (based on the sessions variable).
Example:
1,s1,100,2017-12-13 11:58:01
1,s1,150,2017-12-13 11:58:02
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04
sessions variable should contain those rows:
1,s1,160,2017-12-13 11:58:03
2,s1,100,2017-12-13 11:58:04
but it's returning the max length row only for the corresponding event.
Questions:
- how to collect the data for all groups in sessions variable?
- is there another way to achieve this functionality because with my implementation
the average will be computed on single node because sessions is of type
SingleOutputStreamOperator<Event>
- can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
Thanks