What are the standard approaches for testing a streaming algorithm? I have been able to come up with the below where I
1) create a data source that emits events in bunches with set times so that I know the events will be in the same window, 2) end the stream with a mapWithState where the state checks if the expected elements pass by in the expected order. This does not seem like the most robust way of doing this. Suggestions? Best, Bart import java.io.{FileWriter, StringWriter} import java.util.{Calendar, Date, Properties} import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09} import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.slf4j.{Logger, LoggerFactory} import scala.math.max import scala.util.{Random, hashing} import java.time object SessionizePageviewsTT { val logger: Logger = LoggerFactory.getLogger("SessionizePageviewsTT") // classOf doesn't work on an object def get_now_ms(): Long = { System.currentTimeMillis() } def main(args: Array[String]) { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val xs = 1 to 30 def sourceFF(scx: SourceContext[Int]): Unit = { var cur = 1 var now: Long = get_now_ms() while (cur < 31) { // every 10 wait 10 seconds and then burst a bunch if (cur % 10 == 0) { Thread.sleep(10000) now = get_now_ms() } println("emiting: " + cur + ", " + now) scx.collectWithTimestamp(cur, now) cur += 1 } } val x: DataStream[Int] = env.addSource(sourceFF _) val vals = List(45, 145); def checkFF(xy: (Int, Int), s: Option[Int]): ((Int, Int), Option[Int]) = { val idx = if (s.isDefined) s.get else 0 if (idx < vals.length) { if (xy._1 == vals(idx)) { println("all ok") } else { println("error error") } } (xy, Some(idx + 1)) } x.map(x => (x,1)).keyBy(1).timeWindow(Time.seconds(10)).sum(0).keyBy(x => 1).mapWithState(checkFF).print env.execute(s"XXXX") } }