Thanks for the clarification. On Tue, Jun 7, 2016 at 9:15 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi, > I'm afraid you're running into a bug into the special processing-time > window operator. A suggested workaround would be to switch to > characteristic IngestionTime and use TumblingEventTimeWindows. > > I also open a Jira issue for the bug so that we can keep track of it: > https://issues.apache.org/jira/browse/FLINK-4028 > > Cheers, > Aljoscha > > On Tue, 7 Jun 2016 at 14:57 Soumya Simanta <soumya.sima...@gmail.com> > wrote: > >> The problem is why is the window end time in the future ? >> >> For example if my window size is 60 seconds and my window is being >> evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00 >> pm even when the data that is being evaluated falls in the window 2.59 - >> 3.00. >> >> Sent from my iPhone >> >> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <ches...@apache.org> wrote: >> >> could you state a specific problem? >> >> On 07.06.2016 06:40, Soumya Simanta wrote: >> >> I've a simple program which takes some inputs from a command line (Socket >> stream) and then aggregates based on the key. >> >> When running this program on my local machine I see some output that is >> counter intuitive to my understanding of windows in Flink. >> >> The start time of the Window is around the time the Functions are being >> evaluated. However, *the window end time is around 60 s (window size) >> after the current time (please see below). * >> >> Can someone explain this behaviour please? >> >> import org.apache.flink.api.scala._import >> org.apache.flink.streaming.api.TimeCharacteristicimport >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport >> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport >> org.apache.flink.streaming.api.windowing.time.Timeimport >> org.apache.flink.streaming.api.windowing.windows.TimeWindowimport >> org.apache.flink.util.Collector >> case class EventAgg(start: Long, end: Long, key: String, value: Int) >> object Processor { >> >> val window_length = 60000 // milliseconds def aggregateEvents(key: >> String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): >> Unit = { >> var sum = 0 for (e <- in) { >> sum = sum + e.value >> } >> val start = window.getStart >> val end = window.getEnd >> val diff = (end - start) >> println(s" windowId: ${window.hashCode()} currenttime: >> ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: >> $diff") >> >> >> out.collect( >> new EventAgg( >> start = window.getStart, >> end = window.getEnd, >> key = key, >> value = sum >> ) >> ) >> } >> >> def main(Args: Array[String]): Unit = { >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >> //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >> val sevents = env.socketTextStream("localhost", 9000) >> sevents >> .map(x => parseEvent(x)) >> .keyBy(_.key) >> >> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length))) >> .apply(aggregateEvents(_, _, _, _: Collector[EventAgg])) >> .map("Default Assigner: " + System.currentTimeMillis().toString + " - >> " + _.toString) >> .print() >> >> env.execute("Event time windows") >> } >> >> def parseEvent(s: String): Event = { >> if (s == null || s.trim().length == 0) >> Event("default", 0, 0L) >> else { >> val parts = s.split(",") >> Event(parts(0), parts(1).toInt, 1L) >> } >> } >> } >> >> >> *Output* >> >> windowId: -663519360 currenttime: 1465234200007 key:[a] start: >> 1465234200000 end: 1465234260000 diff: 60000 >> windowId: -663519360 currenttime: 1465234200006 key:[b] start: >> 1465234200000 end: 1465234260000 diff: 60000 >> 3> Default Assigner: 1465234200010 - >> EventAgg(1465234200000,1465234260000,a,3) >> 7> Default Assigner: 1465234200010 - >> EventAgg(1465234200000,1465234260000,b,4) >> >> >> >>