Hi,
I have recently experimented a bit with windowing and event-time mechanism
in flink and either I do not understand how should it work or there is some
kind of a bug.
I have prepared two Source Functions. One that emits watermark itself and
one that does not, but I have prepared a TimestampExtractor that should
produce same results that the previous Source Function, at least from my
point of view.
Afterwards I've prepared a simple summing over an EventTimeTriggered
Sliding Window.
What I expected is a sum of 3*(t_sum) property of Event regardless of the
sleep time in Source Function. That is how the EventTimeSourceFunction
works, but for the SourceFunction it depends on the sleep and does not
equals 3*(t_sum).
I have done some debugging and for the SourceFunction the output of
ExtractTimestampsOperator does not chain to the aggregator operator(the
property output.allOutputs is empty).
Do I understand the mechanism correctly and should my code work as I
described? If not could you please explain a little bit? The code I've
attached to this email.
I would be grateful.
Regards
Dawid Wysakowicz
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.TimestampExtractor
import org.apache.flink.streaming.api.functions.source.{SourceFunction, EventTimeSourceFunction}
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger
object FlinkWindows {
case class Event(name: String, toSum: Int, timestamp: Long)
private var hostName: String = null
private var port: Int = 0
def main(args: Array[String]) {
hostName = args(0)
port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// env.getConfig.setAutoWatermarkInterval(0)
//Read from a socket stream at map it to StockPrice objects
// val stream = env.socketTextStream(hostName, port).map(x => {
// val split = x.split(",")
// Event(split(0), split(1).toInt, split(2).toLong)
// })
val stream = env.addSource(new EventSourceWithTimestamp)
// val stream = env.fromCollection(genCarStream())
.keyBy("name")
stream.assignTimestamps(new TimestampExtractor[Event] {
private var currentWatermark: Long = Long.MinValue
override def getCurrentWatermark: Long = currentWatermark
override def extractWatermark(element: Event, currentTimestamp: Long): Long = {
if (element.timestamp > currentWatermark) {
currentWatermark = element.timestamp - 999
}
currentWatermark
}
override def extractTimestamp(element: Event, currentTimestamp: Long): Long = element.timestamp
})
val sumed = stream
.window(SlidingTimeWindows.of(
Time.of(3, TimeUnit.SECONDS),
Time.of(1, TimeUnit.SECONDS)
))
.trigger(EventTimeTrigger.create())
.sum("toSum")
.print()
env.execute("Stock stream")
}
class EventSource extends SourceFunction[Event] {
var isRunning = true
val offset = System.currentTimeMillis()
override def cancel(): Unit = isRunning = false
override def run(ctx: SourceContext[Event]): Unit = {
var idx = 0
while (isRunning) {
Thread.sleep(1500)
ctx.collect(Event("a", 3, offset + idx * 999))
ctx.collect(Event("b", 2, offset + idx * 999))
idx += 1
}
}
}
class EventSourceWithTimestamp extends EventTimeSourceFunction[Event] {
var isRunning = true
val offset = System.currentTimeMillis()
override def cancel(): Unit = isRunning = false
override def run(ctx: SourceContext[Event]): Unit = {
var idx = 0
while (isRunning) {
Thread.sleep(1500)
ctx.collectWithTimestamp(Event("a", 3, offset + idx * 999), offset + idx * 999)
ctx.collectWithTimestamp(Event("b", 2, offset + idx * 999), offset + idx * 999)
ctx.emitWatermark(new Watermark(offset + (idx - 1) * 999))
idx += 1
}
}
}
}