You have to specify a window for this to work:

stream
  .keyBy(<key>)
  .timeWindow(<time>)
  .aggregate(<aggregateFunction>)



On Fri, Dec 15, 2017 at 3:04 PM, Plamen Paskov
<plamen.pas...@next-stream.com> wrote:
> Hi Ufuk,
>
> Thanks for answer. It looks like in theory the accumulators are the solution
> to my problem but as i'm working on KeyedStream it's not possible to call
> aggregate with AggregateFunction implementation. Am i missing something?
>
>
>
> On 15.12.2017 15:46, Ufuk Celebi wrote:
>>
>> Hey Plamen,
>>
>> I think what you are looking for is the AggregateFunction. This you
>> can use on keyed streams. The Javadoc [1] contains an example for your
>> use case (averaging).
>>
>> – Ufuk
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java
>>
>> On Fri, Dec 15, 2017 at 11:55 AM, Plamen Paskov
>> <plamen.pas...@next-stream.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to calculate the running average of session length and i want
>>> to
>>> trigger the computation on a regular let's say 2 minutes interval. I'm
>>> trying to do it like this:
>>>
>>> package flink;
>>>
>>> import lombok.AllArgsConstructor;
>>> import lombok.NoArgsConstructor;
>>> import lombok.ToString;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import
>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>>
>>> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>>> import org.apache.flink.util.Collector;
>>>
>>> import java.sql.Timestamp;
>>> import java.time.Instant;
>>> import java.time.LocalDateTime;
>>> import java.util.TimeZone;
>>>
>>>
>>> public class StreamingJob {
>>>      public static void main(String[] args) throws Exception {
>>>          StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>          SingleOutputStreamOperator<Event> sessions = env
>>>                  .socketTextStream("localhost", 9000, "\n")
>>>                  .map(new MapFunction<String, Event>() {
>>>                      @Override
>>>                      public Event map(String value) throws Exception {
>>>                          String[] row = value.split(",");
>>>                          return new Event(Long.valueOf(row[0]), row[1],
>>> Long.valueOf(row[2]), Timestamp.valueOf(row[3]).getTime());
>>>                      }
>>>                  })
>>>                  .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
>>>                      @Override
>>>                      public long extractTimestamp(Event element) {
>>>                          return element.timestamp;
>>>                      }
>>>                  })
>>>                  .keyBy("userId", "sessionId")
>>>                  .maxBy("length");
>>>
>>>
>>>          sessions
>>>                  .timeWindowAll(Time.seconds(60), Time.seconds(30))
>>>                  .apply(new AllWindowFunction<Event, Avg, TimeWindow>() {
>>>                      @Override
>>>                      public void apply(TimeWindow window, Iterable<Event>
>>> values, Collector<Avg> out) throws Exception {
>>>                          long sum = 0;
>>>                          int count = 0;
>>>
>>>                          for (Event event : values) {
>>>                              sum += event.length;
>>>                              count++;
>>>                          }
>>>
>>>                          double avg = sum / count;
>>>                          LocalDateTime windowStart =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getStart()),
>>> TimeZone.getDefault().toZoneId());
>>>                          LocalDateTime windowEnd =
>>> LocalDateTime.ofInstant(Instant.ofEpochMilli(window.getEnd()),
>>> TimeZone.getDefault().toZoneId());
>>>                          out.collect(new Avg(avg, windowStart.toString(),
>>> windowEnd.toString()));
>>>                      }
>>>                  });
>>>
>>>          env.execute();
>>>      }
>>>
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Avg {
>>>          public double length;
>>>          public String windowStart;
>>>          public String windowEnd;
>>>      }
>>>
>>>      @AllArgsConstructor
>>>      @NoArgsConstructor
>>>      @ToString
>>>      public static class Event {
>>>          public long userId;
>>>          public String sessionId;
>>>          public long length;
>>>          public long timestamp;
>>>      }
>>> }
>>>
>>> First i want to extract the last session event for every user-session
>>> because it contains the total session length. Then i want to calculate
>>> the
>>> average session length based on the data from
>>> previous operation (based on the sessions variable).
>>>
>>> Example:
>>>
>>> 1,s1,100,2017-12-13 11:58:01
>>> 1,s1,150,2017-12-13 11:58:02
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>>
>>> sessions variable should contain those rows:
>>> 1,s1,160,2017-12-13 11:58:03
>>> 2,s1,100,2017-12-13 11:58:04
>>>
>>> but it's returning the max length row only for the corresponding event.
>>>
>>> Questions:
>>> - how to collect the data for all groups in sessions variable?
>>> - is there another way to achieve this functionality because with my
>>> implementation the average will be computed on single node because
>>> sessions
>>> is of type SingleOutputStreamOperator<Event>
>>> - can i use ContinuousEventTimeTrigger to trigger at regular intervals ?
>>>
>>> Thanks
>
>

Reply via email to