You have to specify a window for this to work: stream .keyBy(<key>) .timeWindow(<time>) .aggregate(<aggregateFunction>)
On Fri, Dec 15, 2017 at 3:04 PM, Plamen Paskov <plamen.pas...@next-stream.com> wrote: > Hi Ufuk, > > Thanks for answer. It looks like in theory the accumulators are the solution > to my problem but as i'm working on KeyedStream it's not possible to call > aggregate with AggregateFunction implementation. Am i missing something? > > > > On 15.12.2017 15:46, Ufuk Celebi wrote: >> >> Hey Plamen, >> >> I think what you are looking for is the AggregateFunction. This you >> can use on keyed streams. The Javadoc [1] contains an example for your >> use case (averaging). >> >> – Ufuk >> >> [1] >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java >> >> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov >> <plamen.pas...@next-stream.com> wrote: >>> >>> Hi, >>> >>> I'm trying to calculate the running average of session length and i want >>> to >>> trigger the computation on a regular let's say 2 minutes interval. I'm >>> trying to do it like this: >>> >>> package flink; >>> >>> import lombok.AllArgsConstructor; >>> import lombok.NoArgsConstructor; >>> import lombok.ToString; >>> import org.apache.flink.api.common.functions.MapFunction; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import >>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import >>> >>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; >>> import >>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; >>> import org.apache.flink.streaming.api.windowing.time.Time; >>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow; >>> import org.apache.flink.util.Collector; >>> >>> import java.sql.Timestamp; >>> import java.time.Instant; >>> import java.time.LocalDateTime; >>> import java.util.TimeZone; >>> >>> >>> public class StreamingJob { >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> SingleOutputStreamOperator<Event> sessions = env >>> .socketTextStream("localhost", 9000, "\n") >>> .map(new MapFunction<String, Event>() { >>> @Override >>> public Event map(String value) throws Exception { >>> String[] row = value.split(","); >>> return new Event(Long.valueOf(row[0]), row[1], >>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime()); >>> } >>> }) >>> .assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) { >>> @Override >>> public long extractTimestamp(Event element) { >>> return element.timestamp; >>> } >>> }) >>> .keyBy("userId", "sessionId") >>> .maxBy("length"); >>> >>> >>> sessions >>> .timeWindowAll(Time.seconds(60), Time.seconds(30)) >>> .apply(new AllWindowFunction<Event, Avg, TimeWindow>() { >>> @Override >>> public void apply(TimeWindow window, Iterable<Event> >>> values, Collector<Avg> out) throws Exception { >>> long sum = 0; >>> int count = 0; >>> >>> for (Event event : values) { >>> sum += event.length; >>> count++; >>> } >>> >>> double avg = sum / count; >>> LocalDateTime windowStart = >>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()), >>> TimeZone.getDefault().toZoneId()); >>> LocalDateTime windowEnd = >>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()), >>> TimeZone.getDefault().toZoneId()); >>> out.collect(new Avg(avg, windowStart.toString(), >>> windowEnd.toString())); >>> } >>> }); >>> >>> env.execute(); >>> } >>> >>> @AllArgsConstructor >>> @NoArgsConstructor >>> @ToString >>> public static class Avg { >>> public double length; >>> public String windowStart; >>> public String windowEnd; >>> } >>> >>> @AllArgsConstructor >>> @NoArgsConstructor >>> @ToString >>> public static class Event { >>> public long userId; >>> public String sessionId; >>> public long length; >>> public long timestamp; >>> } >>> } >>> >>> First i want to extract the last session event for every user-session >>> because it contains the total session length. Then i want to calculate >>> the >>> average session length based on the data from >>> previous operation (based on the sessions variable). >>> >>> Example: >>> >>> 1,s1,100,2017-12-13 11:58:01 >>> 1,s1,150,2017-12-13 11:58:02 >>> 1,s1,160,2017-12-13 11:58:03 >>> 2,s1,100,2017-12-13 11:58:04 >>> >>> sessions variable should contain those rows: >>> 1,s1,160,2017-12-13 11:58:03 >>> 2,s1,100,2017-12-13 11:58:04 >>> >>> but it's returning the max length row only for the corresponding event. >>> >>> Questions: >>> - how to collect the data for all groups in sessions variable? >>> - is there another way to achieve this functionality because with my >>> implementation the average will be computed on single node because >>> sessions >>> is of type SingleOutputStreamOperator<Event> >>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ? >>> >>> Thanks > >