Thanks for the clarification.

On Tue, Jun 7, 2016 at 9:15 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> I'm afraid you're running into a bug into the special processing-time
> window operator. A suggested workaround would be to switch to
> characteristic IngestionTime and use TumblingEventTimeWindows.
>
> I also open a Jira issue for the bug so that we can keep track of it:
> https://issues.apache.org/jira/browse/FLINK-4028
>
> Cheers,
> Aljoscha
>
> On Tue, 7 Jun 2016 at 14:57 Soumya Simanta <soumya.sima...@gmail.com>
> wrote:
>
>> The problem is why is the window end time in the future ?
>>
>> For example if my window size is 60 seconds and my window is being
>> evaluated at 3.00 pm then why is the window end time 3.01 pm and not 3.00
>> pm even when the data that is being evaluated falls in the window 2.59 -
>> 3.00.
>>
>> Sent from my iPhone
>>
>> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <ches...@apache.org> wrote:
>>
>> could you state a specific problem?
>>
>> On 07.06.2016 06:40, Soumya Simanta wrote:
>>
>> I've a simple program which takes some inputs from a command line (Socket
>> stream) and then aggregates based on the key.
>>
>> When running this program on my local machine I see some output that is
>> counter intuitive to my understanding of windows in Flink.
>>
>> The start time of the Window is around the time the Functions are being
>> evaluated. However, *the window end time is around 60 s (window size)
>> after the current time (please see below). *
>>
>> Can someone explain this behaviour please?
>>
>> import org.apache.flink.api.scala._import 
>> org.apache.flink.streaming.api.TimeCharacteristicimport 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindowsimport
>>  org.apache.flink.streaming.api.windowing.time.Timeimport 
>> org.apache.flink.streaming.api.windowing.windows.TimeWindowimport 
>> org.apache.flink.util.Collector
>> case class EventAgg(start: Long, end: Long, key: String, value: Int)
>> object Processor {
>>
>>   val window_length = 60000 // milliseconds  def aggregateEvents(key: 
>> String, window: TimeWindow, in: Iterable[Event], out: Collector[EventAgg]): 
>> Unit = {
>>     var sum = 0    for (e <- in) {
>>       sum = sum + e.value
>>     }
>>     val start = window.getStart
>>     val end = window.getEnd
>>     val diff = (end - start)
>>     println(s" windowId: ${window.hashCode()} currenttime: 
>> ${System.currentTimeMillis()} key:[$key] start: $start end: $end diff: 
>> $diff")
>>
>>
>>     out.collect(
>>       new EventAgg(
>>         start = window.getStart,
>>         end = window.getEnd,
>>         key = key,
>>         value = sum
>>       )
>>     )
>>   }
>>
>>   def main(Args: Array[String]): Unit = {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment    
>> //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    
>> env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)    
>> val sevents = env.socketTextStream("localhost", 9000)
>>     sevents
>>       .map(x => parseEvent(x))
>>       .keyBy(_.key)
>>       
>> .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>>       .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>>       .map("Default Assigner: " + System.currentTimeMillis().toString + " - 
>> " + _.toString)
>>       .print()
>>
>>     env.execute("Event time windows")
>>   }
>>
>>   def parseEvent(s: String): Event = {
>>     if (s == null || s.trim().length == 0)
>>       Event("default", 0, 0L)
>>     else {
>>       val parts = s.split(",")
>>       Event(parts(0), parts(1).toInt, 1L)
>>     }
>>   }
>> }
>>
>>
>> *Output*
>>
>>  windowId: -663519360 currenttime: 1465234200007 key:[a] start:
>> 1465234200000 end: 1465234260000 diff: 60000
>>  windowId: -663519360 currenttime: 1465234200006 key:[b] start:
>> 1465234200000 end: 1465234260000 diff: 60000
>> 3> Default Assigner: 1465234200010 -
>> EventAgg(1465234200000,1465234260000,a,3)
>> 7> Default Assigner: 1465234200010 -
>> EventAgg(1465234200000,1465234260000,b,4)
>>
>>
>>
>>

Reply via email to