could you state a specific problem?

On 07.06.2016 06:40, Soumya Simanta wrote:
I've a simple program which takes some inputs from a command line (Socket stream) and then aggregates based on the key.

When running this program on my local machine I see some output that is counter intuitive to my understanding of windows in Flink.

The start time of the Window is around the time the Functions are being evaluated. However, *the window end time is around 60 s (window size) after the current time (please see below). *

Can someone explain this behaviour please?
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class EventAgg(start: Long, end: Long, key:String, value: Int)

object Processor {

   val window_length =60000 // milliseconds def aggregateEvents(key:String, 
window: TimeWindow, in:Iterable[Event], out: Collector[EventAgg]): Unit = {
     var sum =0 for (e <- in) {
       sum = sum + e.value
     }
     val start = window.getStart
     val end = window.getEnd
     val diff = (end - start)
     println(s" windowId: ${window.hashCode()}currenttime: 
${System.currentTimeMillis()}key:[$key] start: $startend: $enddiff: $diff")


     out.collect(
       new EventAgg(
         start = window.getStart,
         end = window.getEnd,
         key = key,
         value = sum
       )
     )
   }

   def main(Args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
     //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val sevents = 
env.socketTextStream("localhost",9000)
     sevents
       .map(x =>parseEvent(x))
       .keyBy(_.key)
       
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
       .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
       .map("Default Assigner: " + System.currentTimeMillis().toString +" - " + 
_.toString)
       .print()

     env.execute("Event time windows")
   }

   def parseEvent(s:String): Event = {
     if (s ==null || s.trim().length ==0)
       Event("default",0,0L)
     else {
       val parts = s.split(",")
       Event(parts(0), parts(1).toInt,1L)
     }
   }
}

*_Output_*

windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end: 1465234260000 diff: 60000 windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end: 1465234260000 diff: 60000 3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3) 7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)



Reply via email to