
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.TimestampExtractor
import org.apache.flink.streaming.api.functions.source.{SourceFunction, EventTimeSourceFunction}
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger

object FlinkWindows {

  case class Event(name: String, toSum: Int, timestamp: Long)


  private var hostName: String = null
  private var port: Int = 0

  def main(args: Array[String]) {

//    hostName = args(0)
//    port = args(1).toInt

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //    env.getConfig.setAutoWatermarkInterval(0)



    //Read from a socket stream at map it to StockPrice objects
    //    val stream = env.socketTextStream(hostName, port).map(x => {
    //      val split = x.split(",")
    //      Event(split(0), split(1).toInt, split(2).toLong)
    //    })
    val stream = env.addSource(new EventSource)
      //    val stream = env.fromCollection(genCarStream())

    val withTimestamps = stream.assignTimestamps(new TimestampExtractor[Event] {

      private var currentWatermark: Long = Long.MinValue

      override def getCurrentWatermark: Long = currentWatermark

      override def extractWatermark(element: Event, currentTimestamp: Long): Long = {
        if (element.timestamp > currentWatermark) {
          currentWatermark = element.timestamp - 999
        }
        currentWatermark
      }

      override def extractTimestamp(element: Event, currentTimestamp: Long) = element.timestamp
    })

    val sumed = withTimestamps
      .keyBy("name")
      .window(SlidingTimeWindows.of(
        Time.seconds(3),
        Time.seconds(1)
      ))
      .trigger(EventTimeTrigger.create())
      .sum("toSum")
      .print()



    env.execute("Stock stream")
  }



  class EventSource extends SourceFunction[Event] {

    var isRunning = true
    val offset = System.currentTimeMillis()

    override def cancel(): Unit = isRunning = false

    override def run(ctx: SourceContext[Event]): Unit = {

      var idx = 0
      while (isRunning) {
        Thread.sleep(1500)
        ctx.collect(Event("a", 3, offset + idx * 999))
        ctx.collect(Event("b", 2, offset + idx * 999))
        idx += 1
      }
    }
  }

  class EventSourceWithTimestamp extends EventTimeSourceFunction[Event] {

    var isRunning = true
    val offset = System.currentTimeMillis()

    override def cancel(): Unit = isRunning = false

    override def run(ctx: SourceContext[Event]): Unit = {

      var idx = 0
      while (isRunning) {
        Thread.sleep(1500)
        ctx.collectWithTimestamp(Event("a", 3, offset + idx * 999), offset + idx * 999)
        ctx.collectWithTimestamp(Event("b", 2, offset + idx * 999), offset + idx * 999)
        ctx.emitWatermark(new Watermark(offset + (idx - 1) * 999))
        idx += 1
      }
    }
  }

}